lib/rocket_job/plugins/job/worker.rb in rocketjob-3.4.3 vs lib/rocket_job/plugins/job/worker.rb in rocketjob-3.5.0
- old
+ new
@@ -8,19 +8,19 @@
extend ActiveSupport::Concern
module ClassMethods
# Run this job now.
#
- # The job is not saved to the database since it is processed entriely in memory
+ # The job is not saved to the database since it is processed entirely in memory
# As a result before_save and before_destroy callbacks will not be called.
# Validations are still called however prior to calling #perform
#
# Note:
# - Only batch throttles are checked when perform_now is called.
- def perform_now(args, &block)
+ def perform_now(args)
job = new(args)
- block.call(job) if block
+ yield(job) if block_given?
job.perform_now
job
end
# Returns the next job to work on in priority based order
@@ -36,18 +36,17 @@
#
# Note:
# If a job is in queued state it will be started
def rocket_job_next_job(worker_name, filter = {})
while (job = rocket_job_retrieve(worker_name, filter))
- case
- when job.running?
- # Batch Job
- return job
- when job.expired?
+ # Batch Job?
+ return job if job.running?
+
+ if job.expired?
job.rocket_job_fail_on_exception!(worker_name) { job.destroy }
logger.info "Destroyed expired job #{job.class.name}, id:#{job.id}"
- when new_filter = job.send(:rocket_job_evaluate_throttles)
+ elsif (new_filter = job.send(:rocket_job_evaluate_throttles))
rocket_job_merge_filter(filter, new_filter)
# Restore retrieved job so that other workers can process it later
job.set(worker_name: nil, state: :queued)
else
job.worker_name = worker_name
@@ -60,33 +59,33 @@
end
# Requeues all jobs that were running on a server that died
def requeue_dead_server(server_name)
# Need to requeue paused, failed since user may have transitioned job before it finished
- where(:state.in => [:running, :paused, :failed]).each do |job|
+ where(:state.in => %i[running paused failed]).each do |job|
job.requeue!(server_name) if job.may_requeue?(server_name)
end
end
# DEPRECATED
def perform_later(args, &block)
if RocketJob::Config.inline_mode
perform_now(args, &block)
else
job = new(args)
- block.call(job) if block
+ yield(job) if block
job.save!
job
end
end
private
def rocket_job_merge_filter(target, source)
source.each_pair do |k, v|
target[k] =
- if previous = target[k]
+ if (previous = target[k])
v.is_a?(Array) ? previous + v : v
else
v
end
end
@@ -134,16 +133,14 @@
rescue Exception => exc
if failed? || !may_fail?
self.exception = JobException.from_exception(exc)
exception.worker_name = worker_name
save! unless new_record? || destroyed?
+ elsif new_record? || destroyed?
+ fail(worker_name, exc)
else
- if new_record? || destroyed?
- fail(worker_name, exc)
- else
- fail!(worker_name, exc)
- end
+ fail!(worker_name, exc)
end
raise exc if re_raise_exceptions
end
# Works on this job
@@ -152,11 +149,11 @@
#
# If an exception is thrown the job is marked as failed and the exception
# is set in the job itself.
#
# Thread-safe, can be called by multiple threads at the same time
- def rocket_job_work(worker, re_raise_exceptions = false, filter = nil)
+ def rocket_job_work(worker, re_raise_exceptions = false, _filter = nil)
raise(ArgumentError, 'Job must be started before calling #rocket_job_work') unless running?
rocket_job_fail_on_exception!(worker.name, re_raise_exceptions) do
if _perform_callbacks.empty?
@rocket_job_output = perform
else
@@ -184,10 +181,9 @@
# Returns [Hash<String:[Array<ActiveWorker>]>] All servers actively working on this job
def rocket_job_active_workers(server_name = nil)
return [] if !running? || (server_name && !worker_on_server?(server_name))
[ActiveWorker.new(worker_name, started_at, self)]
end
-
end
end
end
end