lib/acidic_job/processor.rb in acidic_job-1.0.0.beta.6 vs lib/acidic_job/processor.rb in acidic_job-1.0.0.beta.7
- old
+ new
@@ -17,11 +17,11 @@
break if @run.finished?
if !@run.known_recovery_point?
raise UnknownRecoveryPoint,
"Defined workflow does not reference this step: #{@run.current_step_name.inspect}"
- elsif !Array(awaited_jobs = @run.current_step_hash.fetch("awaits", [])).compact.empty?
+ elsif Array(awaited_jobs = jobs_from(@run.current_step_awaits)).compact.any?
# We only execute the current step, without progressing to the next step.
# This ensures that any failures in parallel jobs will have this step retried in the main workflow
step_result = @workflow.execute_current_step
# We allow the `#step_done` method to manage progressing the recovery_point to the next step,
# and then calling `process_run` to restart the main workflow on the next step.
@@ -45,18 +45,16 @@
@run.succeeded?
end
private
- def enqueue_awaited_jobs(jobs_or_jobs_getter, step_result)
- awaited_jobs = jobs_from(jobs_or_jobs_getter)
-
+ def enqueue_awaited_jobs(awaited_jobs, step_result)
AcidicJob.logger.log_run_event("Enqueuing #{awaited_jobs.count} awaited jobs...", @job, @run)
# All jobs created in the block are pushed atomically at the end of the block.
AcidicJob::Run.transaction do
awaited_jobs.each do |awaited_job|
- worker_class, args = job_args_and_kwargs(awaited_job)
+ worker_class, args = job_and_args(awaited_job)
job = worker_class.new(*args)
AcidicJob::Run.await!(job, by: @run, return_to: step_result)
end
@@ -79,10 +77,10 @@
raise UnknownAwaitedJob,
"Invalid `awaits`; must be either an jobs Array or method name, was: #{jobs_or_jobs_getter.class.name}"
end
end
- def job_args_and_kwargs(job)
+ def job_and_args(job)
case job
when Class
[job, []]
else
[