lib/rocket_job/plugins/job/worker.rb in rocketjob-3.0.0.beta2 vs lib/rocket_job/plugins/job/worker.rb in rocketjob-3.0.0.beta3
- old
+ new
@@ -46,19 +46,24 @@
# skip_job_ids [Array<BSON::ObjectId>]
# Job ids to exclude when looking for the next job
#
# Note:
# If a job is in queued state it will be started
- def rocket_job_next_job(worker_name, skip_job_ids = nil)
- while (job = rocket_job_retrieve(worker_name, skip_job_ids))
+ def rocket_job_next_job(worker_name, filter = {})
+ while (job = rocket_job_retrieve(worker_name, filter))
case
when job.running?
# Batch Job
return job
when job.expired?
job.rocket_job_fail_on_exception!(worker_name) { job.destroy }
logger.info "Destroyed expired job #{job.class.name}, id:#{job.id}"
+ when job.throttle_exceeded?
+ # Add jobs filter to the current filter
+ throttle_merge_filter(filter, job.throttle_filter)
+ # Restore retrieved job so that other workers can process it later
+ job.set(worker_name: nil, state: :queued)
else
job.worker_name = worker_name
job.rocket_job_fail_on_exception!(worker_name) do
defined?(RocketJobPro) ? job.start! : job.start
end
@@ -68,11 +73,11 @@
end
# Requeues all jobs that were running on a server that died
def requeue_dead_server(server_name)
# Need to requeue paused, failed since user may have transitioned job before it finished
- where(:state.in => [:running, :paused, :faled]).each do |job|
+ where(:state.in => [:running, :paused, :failed]).each do |job|
job.requeue!(server_name) if job.may_requeue?(server_name)
end
end
end
@@ -134,22 +139,27 @@
#
# If an exception is thrown the job is marked as failed and the exception
# is set in the job itself.
#
# Thread-safe, can be called by multiple threads at the same time
- def rocket_job_work(worker, re_raise_exceptions = false)
+ def rocket_job_work(worker, re_raise_exceptions = false, filter = nil)
raise(ArgumentError, 'Job must be started before calling #rocket_job_work') unless running?
rocket_job_fail_on_exception!(worker.name, re_raise_exceptions) do
- run_callbacks :perform do
- # Allow callbacks to fail, complete or abort the job
- if running?
- ret = perform
- if collect_output?
- # Result must be a Hash, if not put it in a Hash
- self.result = ret.is_a?(Hash) ? ret : {'result' => ret}
- end
+ if _perform_callbacks.empty?
+ @rocket_job_output = perform
+ else
+ # Allows @rocket_job_output to be modified by after/around callbacks
+ run_callbacks(:perform) do
+ # Allow callbacks to fail, complete or abort the job
+ @rocket_job_output = perform if running?
end
end
+
+ if collect_output?
+ # Result must be a Hash, if not put it in a Hash
+ self.result = @rocket_job_output.is_a?(Hash) ? @rocket_job_output : {'result' => @rocket_job_output}
+ end
+
if new_record? || destroyed?
complete if may_complete?
else
may_complete? ? complete! : save!
end