lib/rocket_job/plugins/job/worker.rb in rocketjob-3.0.5 vs lib/rocket_job/plugins/job/worker.rb in rocketjob-3.1.0

- old
+ new

@@ -6,29 +6,18 @@ module Job module Worker extend ActiveSupport::Concern module ClassMethods - # Run this job later - # - # Saves it to the database for processing later by workers - def perform_later(args, &block) - if RocketJob::Config.inline_mode - perform_now(args, &block) - else - job = new(args) - block.call(job) if block - job.save! - job - end - end - # Run this job now. # # The job is not saved to the database since it is processed entriely in memory # As a result before_save and before_destroy callbacks will not be called. # Validations are still called however prior to calling #perform + # + # Note: + # - Only batch throttles are checked when perform_now is called. def perform_now(args, &block) job = new(args) block.call(job) if block job.perform_now job @@ -54,14 +43,12 @@ # Batch Job return job when job.expired? job.rocket_job_fail_on_exception!(worker_name) { job.destroy } logger.info "Destroyed expired job #{job.class.name}, id:#{job.id}" - when job.throttle_exceeded? - logger.debug { "Throttle exceeded with job #{job.class.name}, id:#{job.id}" } - # Add jobs filter to the current filter - job.throttle_merge_filter(filter, job.throttle_filter) + when new_filter = job.send(:rocket_job_evaluate_throttles) + rocket_job_merge_filter(filter, new_filter) # Restore retrieved job so that other workers can process it later job.set(worker_name: nil, state: :queued) else job.worker_name = worker_name job.rocket_job_fail_on_exception!(worker_name) do @@ -76,9 +63,35 @@ def requeue_dead_server(server_name) # Need to requeue paused, failed since user may have transitioned job before it finished where(:state.in => [:running, :paused, :failed]).each do |job| job.requeue!(server_name) if job.may_requeue?(server_name) end + end + + # DEPRECATED + def perform_later(args, &block) + if RocketJob::Config.inline_mode + perform_now(args, &block) + else + job = new(args) + block.call(job) if block + job.save! + job + end + end + + private + + def rocket_job_merge_filter(target, source) + source.each_pair do |k, v| + target[k] = + if previous = target[k] + v.is_a?(Array) ? previous + v : v + else + v + end + end + target end end # Runs the job now in the current thread. #