lib/rocket_job/plugins/job/worker.rb in rocketjob-3.0.0.beta2 vs lib/rocket_job/plugins/job/worker.rb in rocketjob-3.0.0.beta3

- old
+ new

@@ -46,19 +46,24 @@ # skip_job_ids [Array<BSON::ObjectId>] # Job ids to exclude when looking for the next job # # Note: # If a job is in queued state it will be started - def rocket_job_next_job(worker_name, skip_job_ids = nil) - while (job = rocket_job_retrieve(worker_name, skip_job_ids)) + def rocket_job_next_job(worker_name, filter = {}) + while (job = rocket_job_retrieve(worker_name, filter)) case when job.running? # Batch Job return job when job.expired? job.rocket_job_fail_on_exception!(worker_name) { job.destroy } logger.info "Destroyed expired job #{job.class.name}, id:#{job.id}" + when job.throttle_exceeded? + # Add jobs filter to the current filter + throttle_merge_filter(filter, job.throttle_filter) + # Restore retrieved job so that other workers can process it later + job.set(worker_name: nil, state: :queued) else job.worker_name = worker_name job.rocket_job_fail_on_exception!(worker_name) do defined?(RocketJobPro) ? job.start! : job.start end @@ -68,11 +73,11 @@ end # Requeues all jobs that were running on a server that died def requeue_dead_server(server_name) # Need to requeue paused, failed since user may have transitioned job before it finished - where(:state.in => [:running, :paused, :faled]).each do |job| + where(:state.in => [:running, :paused, :failed]).each do |job| job.requeue!(server_name) if job.may_requeue?(server_name) end end end @@ -134,22 +139,27 @@ # # If an exception is thrown the job is marked as failed and the exception # is set in the job itself. # # Thread-safe, can be called by multiple threads at the same time - def rocket_job_work(worker, re_raise_exceptions = false) + def rocket_job_work(worker, re_raise_exceptions = false, filter = nil) raise(ArgumentError, 'Job must be started before calling #rocket_job_work') unless running? rocket_job_fail_on_exception!(worker.name, re_raise_exceptions) do - run_callbacks :perform do - # Allow callbacks to fail, complete or abort the job - if running? - ret = perform - if collect_output? - # Result must be a Hash, if not put it in a Hash - self.result = ret.is_a?(Hash) ? ret : {'result' => ret} - end + if _perform_callbacks.empty? + @rocket_job_output = perform + else + # Allows @rocket_job_output to be modified by after/around callbacks + run_callbacks(:perform) do + # Allow callbacks to fail, complete or abort the job + @rocket_job_output = perform if running? end end + + if collect_output? + # Result must be a Hash, if not put it in a Hash + self.result = @rocket_job_output.is_a?(Hash) ? @rocket_job_output : {'result' => @rocket_job_output} + end + if new_record? || destroyed? complete if may_complete? else may_complete? ? complete! : save! end