lib/acidic_job/awaiting.rb in acidic_job-1.0.0.pre22 vs lib/acidic_job/awaiting.rb in acidic_job-1.0.0.pre23
- old
+ new
@@ -6,43 +6,63 @@
module Awaiting
extend ActiveSupport::Concern
private
- def enqueue_step_parallel_jobs(jobs_or_jobs_getter, run, step_result)
- # `batch` is available from Sidekiq::Pro
- raise SidekiqBatchRequired unless defined?(Sidekiq::Batch)
+ def was_awaited_job?
+ (acidic_job_run.present? && acidic_job_run.awaited_by.present?) ||
+ (staged_job_run.present? && staged_job_run.awaited_by.present?)
+ end
- jobs = case jobs_or_jobs_getter
- when Array
- jobs_or_jobs_getter
- when Symbol, String
- method(jobs_or_jobs_getter).call
- end
+ def reenqueue_awaited_by_job
+ run = staged_job_run&.awaited_by || acidic_job_run&.awaited_by
- step_batch = Sidekiq::Batch.new
- # step_batch.description = "AcidicJob::Workflow Step: #{step}"
- step_batch.on(
- :success,
- "#{self.class.name}#step_done",
- # NOTE: options are marshalled through JSON so use only basic types.
- {
- "run_id" => run.id,
- "step_result_yaml" => step_result.to_yaml.strip,
- "parent_worker" => self.class.name,
- "job_names" => jobs.map { |job| job_name(job) }
- }
- )
+ return unless run
- # NOTE: The jobs method is atomic.
- # All jobs created in the block are actually pushed atomically at the end of the block.
- # If an error is raised, none of the jobs will go to Redis.
- step_batch.jobs do
- jobs.each do |job|
- worker, args, kwargs = job_args_and_kwargs(job)
+ current_step = run.workflow[run.recovery_point.to_s]
+ step_result = run.returning_to
- worker.perform_async(*args, **kwargs)
+ job = run.job_class.constantize.deserialize(run.serialized_job)
+ # this needs to be explicitly set so that `was_workflow_job?` appropriately returns `true`
+ # which is what the `after_finish :reenqueue_awaited_by_job` check needs
+ job.instance_variable_set(:@acidic_job_run, run)
+
+ step = Step.new(current_step, run, job, step_result)
+ # TODO: WRITE REGRESSION TESTS FOR PARALLEL JOB FAILING AND RETRYING THE ORIGINAL STEP
+ step.progress
+
+ return if run.finished?
+
+ # job = job_class.constantize.deserialize(serialized_staged_job)
+ # job.enqueue
+
+ # when a batch of jobs for a step succeeds, we begin processing the `AcidicJob::Run` record again
+ process_run(run)
+ end
+
+ def enqueue_step_parallel_jobs(jobs_or_jobs_getter, run, step_result)
+ awaited_jobs = case jobs_or_jobs_getter
+ when Array
+ jobs_or_jobs_getter
+ when Symbol, String
+ method(jobs_or_jobs_getter).call
+ end
+
+ AcidicJob::Run.transaction do
+ awaited_jobs.each do |awaited_job|
+ worker_class, args, kwargs = job_args_and_kwargs(awaited_job)
+
+ job = worker_class.new(*args, **kwargs)
+
+ AcidicJob::Run.create!(
+ staged: true,
+ awaited_by: run,
+ job_class: worker_class,
+ serialized_job: job.serialize_job(*args, **kwargs),
+ idempotency_key: job.idempotency_key
+ )
+ run.update(returning_to: step_result)
end
end
end
def step_done(_status, options)
@@ -54,20 +74,9 @@
# TODO: WRITE REGRESSION TESTS FOR PARALLEL JOB FAILING AND RETRYING THE ORIGINAL STEP
step.progress
# when a batch of jobs for a step succeeds, we begin processing the `AcidicJob::Run` record again
process_run(run)
- end
-
- def job_name(job)
- case job
- when Class, Symbol
- job.to_s
- when String
- job
- else
- job.class.name
- end
end
def job_args_and_kwargs(job)
case job
when Class