lib/rocket_job/plugins/job/worker.rb in rocketjob-3.4.3 vs lib/rocket_job/plugins/job/worker.rb in rocketjob-3.5.0

- old
+ new

@@ -8,19 +8,19 @@ extend ActiveSupport::Concern module ClassMethods # Run this job now. # - # The job is not saved to the database since it is processed entriely in memory + # The job is not saved to the database since it is processed entirely in memory # As a result before_save and before_destroy callbacks will not be called. # Validations are still called however prior to calling #perform # # Note: # - Only batch throttles are checked when perform_now is called. - def perform_now(args, &block) + def perform_now(args) job = new(args) - block.call(job) if block + yield(job) if block_given? job.perform_now job end # Returns the next job to work on in priority based order @@ -36,18 +36,17 @@ # # Note: # If a job is in queued state it will be started def rocket_job_next_job(worker_name, filter = {}) while (job = rocket_job_retrieve(worker_name, filter)) - case - when job.running? - # Batch Job - return job - when job.expired? + # Batch Job? + return job if job.running? + + if job.expired? job.rocket_job_fail_on_exception!(worker_name) { job.destroy } logger.info "Destroyed expired job #{job.class.name}, id:#{job.id}" - when new_filter = job.send(:rocket_job_evaluate_throttles) + elsif (new_filter = job.send(:rocket_job_evaluate_throttles)) rocket_job_merge_filter(filter, new_filter) # Restore retrieved job so that other workers can process it later job.set(worker_name: nil, state: :queued) else job.worker_name = worker_name @@ -60,33 +59,33 @@ end # Requeues all jobs that were running on a server that died def requeue_dead_server(server_name) # Need to requeue paused, failed since user may have transitioned job before it finished - where(:state.in => [:running, :paused, :failed]).each do |job| + where(:state.in => %i[running paused failed]).each do |job| job.requeue!(server_name) if job.may_requeue?(server_name) end end # DEPRECATED def perform_later(args, &block) if RocketJob::Config.inline_mode perform_now(args, &block) else job = new(args) - block.call(job) if block + yield(job) if block job.save! job end end private def rocket_job_merge_filter(target, source) source.each_pair do |k, v| target[k] = - if previous = target[k] + if (previous = target[k]) v.is_a?(Array) ? previous + v : v else v end end @@ -134,16 +133,14 @@ rescue Exception => exc if failed? || !may_fail? self.exception = JobException.from_exception(exc) exception.worker_name = worker_name save! unless new_record? || destroyed? + elsif new_record? || destroyed? + fail(worker_name, exc) else - if new_record? || destroyed? - fail(worker_name, exc) - else - fail!(worker_name, exc) - end + fail!(worker_name, exc) end raise exc if re_raise_exceptions end # Works on this job @@ -152,11 +149,11 @@ # # If an exception is thrown the job is marked as failed and the exception # is set in the job itself. # # Thread-safe, can be called by multiple threads at the same time - def rocket_job_work(worker, re_raise_exceptions = false, filter = nil) + def rocket_job_work(worker, re_raise_exceptions = false, _filter = nil) raise(ArgumentError, 'Job must be started before calling #rocket_job_work') unless running? rocket_job_fail_on_exception!(worker.name, re_raise_exceptions) do if _perform_callbacks.empty? @rocket_job_output = perform else @@ -184,10 +181,9 @@ # Returns [Hash<String:[Array<ActiveWorker>]>] All servers actively working on this job def rocket_job_active_workers(server_name = nil) return [] if !running? || (server_name && !worker_on_server?(server_name)) [ActiveWorker.new(worker_name, started_at, self)] end - end end end end