lib/acidic_job/run.rb in acidic_job-1.0.0.beta.10 vs lib/acidic_job/run.rb in acidic_job-1.0.0.pre1

- old
+ new

@@ -1,303 +1,77 @@ # frozen_string_literal: true require "active_record" require "global_id" -require "base64" require "active_support/core_ext/object/with_options" -require "active_support/core_ext/module/concerning" -require "active_support/concern" module AcidicJob class Run < ActiveRecord::Base include GlobalID::Identification FINISHED_RECOVERY_POINT = "FINISHED" - STAGED_JOB_ID_PREFIX = "STG" - STAGED_JOB_ID_DELIMITER = "__" - IDEMPOTENCY_KEY_LOCK_TIMEOUT_SECONDS = 2 self.table_name = "acidic_job_runs" - validates :idempotency_key, presence: true - validate :not_awaited_but_unstaged + after_create_commit :enqueue_staged_job, if: :staged? - def self.clear_finished - # over-write any pre-existing relation queries on `recovery_point` and/or `error_object` - to_purge = finished + serialize :error_object + serialize :serialized_job + serialize :workflow + store :attr_accessors - count = to_purge.count + validates :staged, inclusion: { in: [true, false] } # uses database default + validates :serialized_job, presence: true + validates :idempotency_key, presence: true, uniqueness: true + validates :job_class, presence: true - return 0 if count.zero? + scope :staged, -> { where(staged: true) } + scope :unstaged, -> { where(staged: false) } + scope :finished, -> { where(recovery_point: FINISHED_RECOVERY_POINT) } + scope :running, -> { where.not(recovery_point: FINISHED_RECOVERY_POINT) } - AcidicJob.logger.info("Deleting #{count} finished AcidicJob runs") - to_purge.delete_all + with_options unless: :staged? do + validates :last_run_at, presence: true + validates :recovery_point, presence: true + validates :workflow, presence: true end - def succeeded? - finished? && !errored? + def finished? + recovery_point == FINISHED_RECOVERY_POINT end - concerning :Awaitable do - included do - belongs_to :awaited_by, class_name: "AcidicJob::Run", optional: true - has_many :batched_runs, class_name: "AcidicJob::Run", foreign_key: "awaited_by_id" - - scope :awaited, -> { where.not(awaited_by: nil) } - scope :unawaited, -> { where(awaited_by: nil) } - - after_update_commit :proceed_with_parent, if: :finished? - - serialize :returning_to, AcidicJob::Serializer - end - - class_methods do - def await!(job, by:, return_to:) - create!( - staged: true, - awaited_by: by, - job_class: job.class.name, - serialized_job: job.serialize, - idempotency_key: job.idempotency_key - ) - by.update(returning_to: return_to) - end - end - - def awaited? - awaited_by.present? - end - - private - - def proceed_with_parent - return unless finished? - return unless awaited_by.present? - return if awaited_by.batched_runs.outstanding.any? - - AcidicJob.logger.log_run_event("Proceeding with parent job...", job, self) - awaited_by.unlock! - awaited_by.proceed - AcidicJob.logger.log_run_event("Proceeded with parent job.", job, self) - end - - protected - - def proceed - # this needs to be explicitly set so that `was_workflow_job?` appropriately returns `true` - # TODO: replace this with some way to check the type of the job directly - # either via class method or explicit module inclusion - job.instance_variable_set(:@acidic_job_run, self) - - workflow = Workflow.new(self, job, returning_to) - # TODO: WRITE REGRESSION TESTS FOR PARALLEL JOB FAILING AND RETRYING THE ORIGINAL STEP - workflow.progress_to_next_step - - # when a batch of jobs for a step succeeds, we begin processing the `AcidicJob::Run` record again - return if finished? - - AcidicJob.logger.log_run_event("Re-enqueuing parent job...", job, self) - enqueue_job - AcidicJob.logger.log_run_event("Re-enqueued parent job.", job, self) - end + def succeeded? + finished? && !failed? end - concerning :Stageable do - included do - after_create_commit :enqueue_job, if: :staged? - - validates :staged, inclusion: { in: [true, false] } # uses database default - - scope :staged, -> { where(staged: true) } - scope :unstaged, -> { where(staged: false) } - end - - class_methods do - def stage!(job) - create!( - staged: true, - job_class: job.class.name, - serialized_job: job.serialize, - idempotency_key: job.try(:idempotency_key) || job.job_id - ) - end - end - - private - - def job_id - return idempotency_key unless staged? - - # encode the identifier for this record in the job ID - global_id = to_global_id.to_s.remove("gid://") - # base64 encoding for minimal security - encoded_global_id = Base64.urlsafe_encode64(global_id, padding: false) - - [ - STAGED_JOB_ID_PREFIX, - idempotency_key, - encoded_global_id - ].join(STAGED_JOB_ID_DELIMITER) - end + def failed? + error_object.present? end - concerning :Workflowable do - included do - serialize :workflow, AcidicJob::Serializer - serialize :error_object, AcidicJob::Serializer - store :attr_accessors, coder: AcidicJob::Serializer + private - with_options unless: :staged? do - validates :last_run_at, presence: true - validates :recovery_point, presence: true - validates :workflow, presence: true - end - end + def enqueue_staged_job + return unless staged? - def workflow? - self[:workflow].present? - end + # encode the identifier for this record in the job ID + # base64 encoding for minimal security + global_id = to_global_id.to_s.remove("gid://") + encoded_global_id = Base64.encode64(global_id).strip + staged_job_id = "STG_#{idempotency_key}__#{encoded_global_id}" - def attr_accessors - self[:attr_accessors] || {} - end + serialized_staged_job = if serialized_job.key?("jid") + serialized_job.merge("jid" => staged_job_id) + elsif serialized_job.key?("job_id") + serialized_job.merge("job_id" => staged_job_id) + else + raise UnknownSerializedJobIdentifier + end - def current_step_name - recovery_point - end + job = job_class.constantize.deserialize(serialized_staged_job) - def current_step_hash - workflow[current_step_name] - end + job.enqueue - def next_step_name - current_step_hash.fetch("then") - end - - def current_step_awaits - current_step_hash["awaits"] - end - - def next_step_finishes? - next_step_name.to_s == FINISHED_RECOVERY_POINT - end - - def current_step_finished? - current_step_name.to_s == FINISHED_RECOVERY_POINT - end - end - - concerning :Jobbable do - included do - serialize :serialized_job, JSON - - validates :serialized_job, presence: true - validates :job_class, presence: true - end - - def job - return @job if defined? @job - - serialized_job_for_run = serialized_job.merge("job_id" => job_id) - job_class_for_run = job_class.constantize - - @job = job_class_for_run.deserialize(serialized_job_for_run) - end - - def enqueue_job - job.enqueue - - # NOTE: record will be deleted after the job has successfully been performed - true - end - end - - concerning :Finishable do - included do - scope :finished, -> { where(recovery_point: FINISHED_RECOVERY_POINT) } - scope :outstanding, lambda { - where.not(recovery_point: FINISHED_RECOVERY_POINT).or(where(recovery_point: [nil, ""])) - } - end - - def finish! - finish and unlock and save! - end - - def finish - self.recovery_point = FINISHED_RECOVERY_POINT - self - end - - def finished? - recovery_point.to_s == FINISHED_RECOVERY_POINT - end - end - - concerning :Unlockable do - included do - scope :unlocked, -> { where(locked_at: nil) } - scope :locked, -> { where.not(locked_at: nil) } - end - - def unlock! - unlock and save! - end - - def unlock - self.locked_at = nil - self - end - - def locked? - locked_at.present? - end - - def lock_active? - return false if locked_at.nil? - - locked_at > Time.current - IDEMPOTENCY_KEY_LOCK_TIMEOUT_SECONDS - end - end - - concerning :ErrorStoreable do - included do - scope :unerrored, -> { where(error_object: nil) } - scope :errored, -> { where.not(error_object: nil) } - end - - def store_error!(error) - reload and unlock and store_error(error) and save! - end - - def store_error(error) - self.error_object = error - self - end - - def errored? - error_object.present? - end - end - - concerning :Recoverable do - def recover_to!(point) - recover_to(point) and save! - end - - def recover_to(point) - self.recovery_point = point - self - end - - def known_recovery_point? - workflow.key?(recovery_point) - end - end - - def not_awaited_but_unstaged - return true unless awaited? && !staged? - - errors.add(:base, "cannot be awaited by another job but not staged") + # NOTE: record will be deleted after the job has successfully been performed + true end end end