lib/acidic_job/processor.rb in acidic_job-1.0.0.beta.6 vs lib/acidic_job/processor.rb in acidic_job-1.0.0.beta.7

- old
+ new

@@ -17,11 +17,11 @@ break if @run.finished? if !@run.known_recovery_point? raise UnknownRecoveryPoint, "Defined workflow does not reference this step: #{@run.current_step_name.inspect}" - elsif !Array(awaited_jobs = @run.current_step_hash.fetch("awaits", [])).compact.empty? + elsif Array(awaited_jobs = jobs_from(@run.current_step_awaits)).compact.any? # We only execute the current step, without progressing to the next step. # This ensures that any failures in parallel jobs will have this step retried in the main workflow step_result = @workflow.execute_current_step # We allow the `#step_done` method to manage progressing the recovery_point to the next step, # and then calling `process_run` to restart the main workflow on the next step. @@ -45,18 +45,16 @@ @run.succeeded? end private - def enqueue_awaited_jobs(jobs_or_jobs_getter, step_result) - awaited_jobs = jobs_from(jobs_or_jobs_getter) - + def enqueue_awaited_jobs(awaited_jobs, step_result) AcidicJob.logger.log_run_event("Enqueuing #{awaited_jobs.count} awaited jobs...", @job, @run) # All jobs created in the block are pushed atomically at the end of the block. AcidicJob::Run.transaction do awaited_jobs.each do |awaited_job| - worker_class, args = job_args_and_kwargs(awaited_job) + worker_class, args = job_and_args(awaited_job) job = worker_class.new(*args) AcidicJob::Run.await!(job, by: @run, return_to: step_result) end @@ -79,10 +77,10 @@ raise UnknownAwaitedJob, "Invalid `awaits`; must be either an jobs Array or method name, was: #{jobs_or_jobs_getter.class.name}" end end - def job_args_and_kwargs(job) + def job_and_args(job) case job when Class [job, []] else [