lib/rocket_job/worker.rb in rocketjob-4.0.0 vs lib/rocket_job/worker.rb in rocketjob-4.1.0
- old
+ new
@@ -6,21 +6,23 @@
# A worker runs on a single operating system thread
# Is usually started under a Rocket Job server process.
class Worker
include SemanticLogger::Loggable
include ActiveSupport::Callbacks
- extend Forwardable
- def_delegator :@thread, :alive?
- def_delegator :@thread, :backtrace
- def_delegator :@thread, :join
-
define_callbacks :running
attr_accessor :id, :re_check_seconds, :filter, :current_filter
- attr_reader :thread, :name
+ attr_reader :thread, :name, :inline
+ # Raised when a worker is killed so that it shutdown immediately, yet cleanly.
+ #
+ # Note:
+ # - It is not recommended to catch this exception since it is to shutdown workers quickly.
+ class Shutdown < Interrupt
+ end
+
def self.before_running(*filters, &blk)
set_callback(:running, :before, *filters, &blk)
end
def self.after_running(*filters, &blk)
@@ -36,43 +38,52 @@
inline: false,
re_check_seconds: Config.instance.re_check_seconds,
filter: nil)
@id = id
@server_name = server_name
- @shutdown =
- if defined?(Concurrent::JavaAtomicBoolean) || defined?(Concurrent::CAtomicBoolean)
- Concurrent::AtomicBoolean.new(false)
- else
- false
- end
+ @shutdown = Concurrent::Event.new
@name = "#{server_name}:#{id}"
@re_check_seconds = (re_check_seconds || 60).to_f
@re_check_start = Time.now
@filter = filter.nil? ? {} : filter.dup
@current_filter = @filter.dup
@thread = Thread.new { run } unless inline
+ @inline = inline
end
- if defined?(Concurrent::JavaAtomicBoolean) || defined?(Concurrent::CAtomicBoolean)
- # Tells this worker to shutdown as soon the current job/slice is complete
- def shutdown!
- @shutdown.make_true
- end
+ def alive?
+ inline ? true : @thread.alive?
+ end
- def shutdown?
- @shutdown.value
- end
- else
- def shutdown!
- @shutdown = true
- end
+ def backtrace
+ inline ? Thread.current.backtrace : @thread.backtrace
+ end
- def shutdown?
- @shutdown
- end
+ def join(*args)
+ @thread.join(*args) unless inline
end
+ # Send each active worker the RocketJob::ShutdownException so that stops processing immediately.
+ def kill
+ return true if inline
+
+ @thread.raise(Shutdown, "Shutdown due to kill request for worker: #{name}") if @thread.alive?
+ end
+
+ def shutdown?
+ @shutdown.set?
+ end
+
+ def shutdown!
+ @shutdown.set
+ end
+
+ # Returns [true|false] whether the shutdown indicator was set
+ def wait_for_shutdown?(timeout = nil)
+ @shutdown.wait(timeout)
+ end
+
private
# Process jobs until it shuts down
#
# Params
@@ -80,17 +91,16 @@
# The number of this worker for logging purposes
def run
Thread.current.name = format('rocketjob %03i', id)
logger.info 'Started'
until shutdown?
+ wait = RocketJob::Config.instance.max_poll_seconds
if process_available_jobs
# Keeps workers staggered across the poll interval so that
# all workers don't poll at the same time
- sleep rand(RocketJob::Config.instance.max_poll_seconds * 1000) / 1000
- else
- break if shutdown?
- sleep RocketJob::Config.instance.max_poll_seconds
+ wait = rand(wait * 1000) / 1000
end
+ break if wait_for_shutdown?(wait)
end
logger.info 'Stopping'
rescue Exception => exc
logger.fatal('Unhandled exception in job processing thread', exc)
ensure