lib/rocket_job/worker.rb in rocketjob-4.0.0 vs lib/rocket_job/worker.rb in rocketjob-4.1.0

- old
+ new

@@ -6,21 +6,23 @@ # A worker runs on a single operating system thread # Is usually started under a Rocket Job server process. class Worker include SemanticLogger::Loggable include ActiveSupport::Callbacks - extend Forwardable - def_delegator :@thread, :alive? - def_delegator :@thread, :backtrace - def_delegator :@thread, :join - define_callbacks :running attr_accessor :id, :re_check_seconds, :filter, :current_filter - attr_reader :thread, :name + attr_reader :thread, :name, :inline + # Raised when a worker is killed so that it shutdown immediately, yet cleanly. + # + # Note: + # - It is not recommended to catch this exception since it is to shutdown workers quickly. + class Shutdown < Interrupt + end + def self.before_running(*filters, &blk) set_callback(:running, :before, *filters, &blk) end def self.after_running(*filters, &blk) @@ -36,43 +38,52 @@ inline: false, re_check_seconds: Config.instance.re_check_seconds, filter: nil) @id = id @server_name = server_name - @shutdown = - if defined?(Concurrent::JavaAtomicBoolean) || defined?(Concurrent::CAtomicBoolean) - Concurrent::AtomicBoolean.new(false) - else - false - end + @shutdown = Concurrent::Event.new @name = "#{server_name}:#{id}" @re_check_seconds = (re_check_seconds || 60).to_f @re_check_start = Time.now @filter = filter.nil? ? {} : filter.dup @current_filter = @filter.dup @thread = Thread.new { run } unless inline + @inline = inline end - if defined?(Concurrent::JavaAtomicBoolean) || defined?(Concurrent::CAtomicBoolean) - # Tells this worker to shutdown as soon the current job/slice is complete - def shutdown! - @shutdown.make_true - end + def alive? + inline ? true : @thread.alive? + end - def shutdown? - @shutdown.value - end - else - def shutdown! - @shutdown = true - end + def backtrace + inline ? Thread.current.backtrace : @thread.backtrace + end - def shutdown? - @shutdown - end + def join(*args) + @thread.join(*args) unless inline end + # Send each active worker the RocketJob::ShutdownException so that stops processing immediately. + def kill + return true if inline + + @thread.raise(Shutdown, "Shutdown due to kill request for worker: #{name}") if @thread.alive? + end + + def shutdown? + @shutdown.set? + end + + def shutdown! + @shutdown.set + end + + # Returns [true|false] whether the shutdown indicator was set + def wait_for_shutdown?(timeout = nil) + @shutdown.wait(timeout) + end + private # Process jobs until it shuts down # # Params @@ -80,17 +91,16 @@ # The number of this worker for logging purposes def run Thread.current.name = format('rocketjob %03i', id) logger.info 'Started' until shutdown? + wait = RocketJob::Config.instance.max_poll_seconds if process_available_jobs # Keeps workers staggered across the poll interval so that # all workers don't poll at the same time - sleep rand(RocketJob::Config.instance.max_poll_seconds * 1000) / 1000 - else - break if shutdown? - sleep RocketJob::Config.instance.max_poll_seconds + wait = rand(wait * 1000) / 1000 end + break if wait_for_shutdown?(wait) end logger.info 'Stopping' rescue Exception => exc logger.fatal('Unhandled exception in job processing thread', exc) ensure