lib/rocket_job/plugins/job/worker.rb in rocketjob-5.1.1 vs lib/rocket_job/plugins/job/worker.rb in rocketjob-5.2.0.beta1

- old
+ new

@@ -1,6 +1,6 @@ -require 'active_support/concern' +require "active_support/concern" # Worker behavior for a job module RocketJob module Plugins module Job @@ -21,68 +21,19 @@ yield(job) if block_given? job.perform_now job end - # Returns the next job to work on in priority based order - # Returns nil if there are currently no queued jobs, or processing batch jobs - # with records that require processing - # - # Parameters - # worker_name [String] - # Name of the worker that will be processing this job - # - # skip_job_ids [Array<BSON::ObjectId>] - # Job ids to exclude when looking for the next job - # - # Note: - # If a job is in queued state it will be started - def rocket_job_next_job(worker_name, filter = {}) - while (job = rocket_job_retrieve(worker_name, filter)) - # Batch Job? - return job if job.running? - - if job.expired? - job.rocket_job_fail_on_exception!(worker_name) { job.destroy } - logger.info "Destroyed expired job #{job.class.name}, id:#{job.id}" - elsif (new_filter = job.send(:rocket_job_evaluate_throttles)) - rocket_job_merge_filter(filter, new_filter) - # Restore retrieved job so that other workers can process it later - job.set(worker_name: nil, state: :queued) - else - job.worker_name = worker_name - job.rocket_job_fail_on_exception!(worker_name) do - job.start! - end - return job if job.running? - end - end - end - # Requeues all jobs that were running on a server that died def requeue_dead_server(server_name) # Need to requeue paused, failed since user may have transitioned job before it finished with(read: {mode: :primary}) do |conn| conn.where(:state.in => %i[running paused failed]).each do |job| job.requeue!(server_name) if job.may_requeue?(server_name) end end end - - private - - def rocket_job_merge_filter(target, source) - source.each_pair do |k, v| - target[k] = - if (previous = target[k]) - v.is_a?(Array) ? previous + v : v - else - v - end - end - target - end end # Runs the job now in the current thread. # # Validations are called prior to running the job. @@ -110,42 +61,42 @@ # Fail this job in the event of an exception. # # The job is automatically saved only if an exception is raised in the supplied block. # - # worker_name: [String] - # Name of the server on which the exception has occurred - # # re_raise_exceptions: [true|false] # Re-raise the exception after updating the job # Default: false - def rocket_job_fail_on_exception!(worker_name, re_raise_exceptions = false) - yield - rescue Exception => exc - if failed? || !may_fail? - self.exception = JobException.from_exception(exc) - exception.worker_name = worker_name - save! unless new_record? || destroyed? - elsif new_record? || destroyed? - fail(worker_name, exc) - else - fail!(worker_name, exc) + def fail_on_exception!(re_raise_exceptions = false, &block) + SemanticLogger.named_tagged(job: id.to_s, &block) + rescue Exception => e + SemanticLogger.named_tagged(job: id.to_s) do + if failed? || !may_fail? + self.exception = JobException.from_exception(e) + exception.worker_name = worker_name + save! unless new_record? || destroyed? + elsif new_record? || destroyed? + fail(worker_name, e) + else + fail!(worker_name, e) + end + raise e if re_raise_exceptions end - raise exc if re_raise_exceptions end # Works on this job # - # Returns [true|false] whether this job should be excluded from the next lookup + # Returns [true|false] whether any work was performed. # # If an exception is thrown the job is marked as failed and the exception # is set in the job itself. # # Thread-safe, can be called by multiple threads at the same time - def rocket_job_work(worker, re_raise_exceptions = false, _filter = nil) - raise(ArgumentError, 'Job must be started before calling #rocket_job_work') unless running? - rocket_job_fail_on_exception!(worker.name, re_raise_exceptions) do + def rocket_job_work(_worker, re_raise_exceptions = false) + raise(ArgumentError, "Job must be started before calling #rocket_job_work") unless running? + + fail_on_exception!(re_raise_exceptions) do if _perform_callbacks.empty? @rocket_job_output = perform else # Allows @rocket_job_output to be modified by after/around callbacks run_callbacks(:perform) do @@ -154,11 +105,11 @@ end end if collect_output? # Result must be a Hash, if not put it in a Hash - self.result = @rocket_job_output.is_a?(Hash) ? @rocket_job_output : {'result' => @rocket_job_output} + self.result = @rocket_job_output.is_a?(Hash) ? @rocket_job_output : {"result" => @rocket_job_output} end if new_record? || destroyed? complete if may_complete? else @@ -169,9 +120,10 @@ end # Returns [Hash<String:[Array<ActiveWorker>]>] All servers actively working on this job def rocket_job_active_workers(server_name = nil) return [] if !running? || (server_name && !worker_on_server?(server_name)) + [ActiveWorker.new(worker_name, started_at, self)] end end end end