lib/acidic_job/awaiting.rb in acidic_job-1.0.0.pre22 vs lib/acidic_job/awaiting.rb in acidic_job-1.0.0.pre23

- old
+ new

@@ -6,43 +6,63 @@ module Awaiting extend ActiveSupport::Concern private - def enqueue_step_parallel_jobs(jobs_or_jobs_getter, run, step_result) - # `batch` is available from Sidekiq::Pro - raise SidekiqBatchRequired unless defined?(Sidekiq::Batch) + def was_awaited_job? + (acidic_job_run.present? && acidic_job_run.awaited_by.present?) || + (staged_job_run.present? && staged_job_run.awaited_by.present?) + end - jobs = case jobs_or_jobs_getter - when Array - jobs_or_jobs_getter - when Symbol, String - method(jobs_or_jobs_getter).call - end + def reenqueue_awaited_by_job + run = staged_job_run&.awaited_by || acidic_job_run&.awaited_by - step_batch = Sidekiq::Batch.new - # step_batch.description = "AcidicJob::Workflow Step: #{step}" - step_batch.on( - :success, - "#{self.class.name}#step_done", - # NOTE: options are marshalled through JSON so use only basic types. - { - "run_id" => run.id, - "step_result_yaml" => step_result.to_yaml.strip, - "parent_worker" => self.class.name, - "job_names" => jobs.map { |job| job_name(job) } - } - ) + return unless run - # NOTE: The jobs method is atomic. - # All jobs created in the block are actually pushed atomically at the end of the block. - # If an error is raised, none of the jobs will go to Redis. - step_batch.jobs do - jobs.each do |job| - worker, args, kwargs = job_args_and_kwargs(job) + current_step = run.workflow[run.recovery_point.to_s] + step_result = run.returning_to - worker.perform_async(*args, **kwargs) + job = run.job_class.constantize.deserialize(run.serialized_job) + # this needs to be explicitly set so that `was_workflow_job?` appropriately returns `true` + # which is what the `after_finish :reenqueue_awaited_by_job` check needs + job.instance_variable_set(:@acidic_job_run, run) + + step = Step.new(current_step, run, job, step_result) + # TODO: WRITE REGRESSION TESTS FOR PARALLEL JOB FAILING AND RETRYING THE ORIGINAL STEP + step.progress + + return if run.finished? + + # job = job_class.constantize.deserialize(serialized_staged_job) + # job.enqueue + + # when a batch of jobs for a step succeeds, we begin processing the `AcidicJob::Run` record again + process_run(run) + end + + def enqueue_step_parallel_jobs(jobs_or_jobs_getter, run, step_result) + awaited_jobs = case jobs_or_jobs_getter + when Array + jobs_or_jobs_getter + when Symbol, String + method(jobs_or_jobs_getter).call + end + + AcidicJob::Run.transaction do + awaited_jobs.each do |awaited_job| + worker_class, args, kwargs = job_args_and_kwargs(awaited_job) + + job = worker_class.new(*args, **kwargs) + + AcidicJob::Run.create!( + staged: true, + awaited_by: run, + job_class: worker_class, + serialized_job: job.serialize_job(*args, **kwargs), + idempotency_key: job.idempotency_key + ) + run.update(returning_to: step_result) end end end def step_done(_status, options) @@ -54,20 +74,9 @@ # TODO: WRITE REGRESSION TESTS FOR PARALLEL JOB FAILING AND RETRYING THE ORIGINAL STEP step.progress # when a batch of jobs for a step succeeds, we begin processing the `AcidicJob::Run` record again process_run(run) - end - - def job_name(job) - case job - when Class, Symbol - job.to_s - when String - job - else - job.class.name - end end def job_args_and_kwargs(job) case job when Class