lib/rocket_job/worker.rb in rocketjob-5.4.1 vs lib/rocket_job/worker.rb in rocketjob-6.0.0.rc1

- old
+ new

@@ -1,81 +1,58 @@ -require "concurrent" -require "forwardable" module RocketJob # Worker # # A worker runs on a single operating system thread # Is usually started under a Rocket Job server process. class Worker include SemanticLogger::Loggable - include ActiveSupport::Callbacks - define_callbacks :running - attr_accessor :id, :current_filter - attr_reader :thread, :name, :inline, :server_name + attr_reader :name, :server_name # Raised when a worker is killed so that it shutdown immediately, yet cleanly. # # Note: # - It is not recommended to catch this exception since it is to shutdown workers quickly. class Shutdown < RuntimeError end - def self.before_running(*filters, &blk) - set_callback(:running, :before, *filters, &blk) - end - - def self.after_running(*filters, &blk) - set_callback(:running, :after, *filters, &blk) - end - - def self.around_running(*filters, &blk) - set_callback(:running, :around, *filters, &blk) - end - - def initialize(id: 0, server_name: "inline:0", inline: false) + def initialize(id: 0, server_name: "inline:0") @id = id @server_name = server_name - @shutdown = Concurrent::Event.new @name = "#{server_name}:#{id}" @re_check_start = Time.now @current_filter = Config.filter || {} - @thread = Thread.new { run } unless inline - @inline = inline end def alive? - inline ? true : @thread.alive? + true end def backtrace - inline ? Thread.current.backtrace : @thread.backtrace + Thread.current.backtrace end - def join(*args) - @thread.join(*args) unless inline + def join(*_args) + true end - # Send each active worker the RocketJob::ShutdownException so that stops processing immediately. def kill - return true if inline - - @thread.raise(Shutdown, "Shutdown due to kill request for worker: #{name}") if @thread.alive? + true end def shutdown? - @shutdown.set? + false end def shutdown! - @shutdown.set + true end # Returns [true|false] whether the shutdown indicator was set - def wait_for_shutdown?(timeout = nil) - @shutdown.wait(timeout) + def wait_for_shutdown?(_timeout = nil) + false end # Process jobs until it shuts down # # Params @@ -144,10 +121,12 @@ # Batch has its own throttles for slices. return job if job.running? # Should this job be throttled? next if job.fail_on_exception! { throttled_job?(job) } + # Job failed during throttle execution? + next if job.failed? # Start this job! job.fail_on_exception! { job.start!(name) } return job if job.running? end @@ -169,30 +148,17 @@ # and assigns it to this worker. # # Applies the current filter to exclude filtered jobs. # # Returns nil if no jobs are available for processing. - if Mongoid::VERSION.to_f >= 7.1 - def find_and_assign_job - SemanticLogger.silence(:info) do - scheduled = RocketJob::Job.where(run_at: nil).or(:run_at.lte => Time.now) - working = RocketJob::Job.queued.or(state: :running, sub_state: :processing) - query = RocketJob::Job.and(working, scheduled) - query = query.and(current_filter) unless current_filter.blank? - update = {"$set" => {"worker_name" => name, "state" => "running"}} - query.sort(priority: 1, _id: 1).find_one_and_update(update, bypass_document_validation: true) - end - end - else - def find_and_assign_job - SemanticLogger.silence(:info) do - scheduled = {"$or" => [{run_at: nil}, {:run_at.lte => Time.now}]} - working = {"$or" => [{state: :queued}, {state: :running, sub_state: :processing}]} - query = RocketJob::Job.and(working, scheduled) - query = query.where(current_filter) unless current_filter.blank? - update = {"$set" => {"worker_name" => name, "state" => "running"}} - query.sort(priority: 1, _id: 1).find_one_and_update(update, bypass_document_validation: true) - end + def find_and_assign_job + SemanticLogger.silence(:info) do + scheduled = RocketJob::Job.where(run_at: nil).or(:run_at.lte => Time.now) + working = RocketJob::Job.queued.or(state: "running", sub_state: "processing") + query = RocketJob::Job.and(working, scheduled) + query = query.and(current_filter) unless current_filter.blank? + update = {"$set" => {"worker_name" => name, "state" => "running"}} + query.sort(priority: 1, _id: 1).find_one_and_update(update, bypass_document_validation: true) end end # Add the supplied filter to the current filter. def add_to_current_filter(filter)