lib/rocket_job/plugins/job/worker.rb in rocketjob-3.0.5 vs lib/rocket_job/plugins/job/worker.rb in rocketjob-3.1.0
- old
+ new
@@ -6,29 +6,18 @@
module Job
module Worker
extend ActiveSupport::Concern
module ClassMethods
- # Run this job later
- #
- # Saves it to the database for processing later by workers
- def perform_later(args, &block)
- if RocketJob::Config.inline_mode
- perform_now(args, &block)
- else
- job = new(args)
- block.call(job) if block
- job.save!
- job
- end
- end
-
# Run this job now.
#
# The job is not saved to the database since it is processed entriely in memory
# As a result before_save and before_destroy callbacks will not be called.
# Validations are still called however prior to calling #perform
+ #
+ # Note:
+ # - Only batch throttles are checked when perform_now is called.
def perform_now(args, &block)
job = new(args)
block.call(job) if block
job.perform_now
job
@@ -54,14 +43,12 @@
# Batch Job
return job
when job.expired?
job.rocket_job_fail_on_exception!(worker_name) { job.destroy }
logger.info "Destroyed expired job #{job.class.name}, id:#{job.id}"
- when job.throttle_exceeded?
- logger.debug { "Throttle exceeded with job #{job.class.name}, id:#{job.id}" }
- # Add jobs filter to the current filter
- job.throttle_merge_filter(filter, job.throttle_filter)
+ when new_filter = job.send(:rocket_job_evaluate_throttles)
+ rocket_job_merge_filter(filter, new_filter)
# Restore retrieved job so that other workers can process it later
job.set(worker_name: nil, state: :queued)
else
job.worker_name = worker_name
job.rocket_job_fail_on_exception!(worker_name) do
@@ -76,9 +63,35 @@
def requeue_dead_server(server_name)
# Need to requeue paused, failed since user may have transitioned job before it finished
where(:state.in => [:running, :paused, :failed]).each do |job|
job.requeue!(server_name) if job.may_requeue?(server_name)
end
+ end
+
+ # DEPRECATED
+ def perform_later(args, &block)
+ if RocketJob::Config.inline_mode
+ perform_now(args, &block)
+ else
+ job = new(args)
+ block.call(job) if block
+ job.save!
+ job
+ end
+ end
+
+ private
+
+ def rocket_job_merge_filter(target, source)
+ source.each_pair do |k, v|
+ target[k] =
+ if previous = target[k]
+ v.is_a?(Array) ? previous + v : v
+ else
+ v
+ end
+ end
+ target
end
end
# Runs the job now in the current thread.
#