lib/rocket_job/worker.rb in rocketjob-1.3.0 vs lib/rocket_job/worker.rb in rocketjob-2.0.0.rc1
- old
+ new
@@ -1,8 +1,8 @@
# encoding: UTF-8
require 'socket'
-require 'aasm'
+require 'concurrent'
module RocketJob
# Worker
#
# On startup a worker instance will automatically register itself
# if not already present
@@ -25,17 +25,14 @@
#
# Sending the kill signal locally will result in starting the shutdown process
# immediately. Via the UI or Ruby code the worker can take up to 15 seconds
# (the heartbeat interval) to start shutting down.
class Worker
- include MongoMapper::Document
- include AASM
+ include Plugins::Document
+ include Plugins::StateMachine
include SemanticLogger::Loggable
- # Prevent data in MongoDB from re-defining the model behavior
- #self.static_keys = true
-
# @formatter:off
# Unique Name of this worker instance
# Defaults to the `hostname` but _must_ be overriden if mutiple Worker instances
# are started on the same host
# The unique name is used on re-start to re-queue any jobs that were being processed
@@ -71,42 +68,53 @@
transitions from: :starting, to: :running
before do
self.started_at = Time.now
end
end
+
event :pause do
transitions from: :running, to: :paused
end
+
event :resume do
transitions from: :paused, to: :running
end
+
event :stop do
transitions from: :running, to: :stopping
transitions from: :paused, to: :stopping
transitions from: :starting, to: :stopping
end
end
# @formatter:on
- attr_reader :thread_pool
-
# Requeue any jobs being worked by this worker when it is destroyed
before_destroy :requeue_jobs
# Run the worker process
# Attributes supplied are passed to #new
def self.run(attrs={})
- worker = new(attrs)
- worker.build_heartbeat
- worker.save!
+ Thread.current.name = 'rocketjob main'
create_indexes
register_signal_handlers
- if defined?(RocketJobPro) && (RocketJob::Job.database.name != RocketJob::SlicedJob.database.name)
+ if defined?(RocketJobPro) && (RocketJob::Job.database.name != RocketJob::Jobs::PerformanceJob.database.name)
raise 'The RocketJob configuration is being applied after the system has been initialized'
end
- logger.info "Using MongoDB Database: #{RocketJob::Job.database.name}"
- worker.run
+
+ worker = create!(attrs)
+ if worker.max_threads == 0
+ # Does not start any additional threads and runs the worker in the current thread.
+ # No heartbeats are performed. So this worker will appear as a zombie in RJMC.
+ # Designed for profiling purposes where a single thread is much simpler to profile.
+ worker.started!
+ worker.send(:worker, 0)
+ else
+ worker.send(:run)
+ end
+
+ ensure
+ worker.destroy if worker
end
# Create indexes
def self.create_indexes
ensure_index [[:name, 1]], background: true, unique: true
@@ -125,15 +133,10 @@
count += 1
end
count
end
- def self.destroy_dead_workers
- warn 'RocketJob::Worker.destroy_dead_workers is deprecated, use RocketJob::Worker.destroy_zombies'
- destroy_zombies
- end
-
# Stop all running, paused, or starting workers
def self.stop_all
where(state: [:running, :paused, :starting]).each(&:stop!)
end
@@ -145,93 +148,139 @@
# Resume all paused workers
def self.resume_all
paused.each(&:resume!)
end
+ # Returns [Hash<String:Integer>] of the number of workers in each state.
+ # Note: If there are no workers in that particular state then the hash will not have a value for it.
+ #
+ # Example workers in every state:
+ # RocketJob::Worker.counts_by_state
+ # # => {
+ # :aborted => 1,
+ # :completed => 37,
+ # :failed => 1,
+ # :paused => 3,
+ # :queued => 4,
+ # :running => 1,
+ # :queued_now => 1,
+ # :scheduled => 3
+ # }
+ #
+ # Example no workers active:
+ # RocketJob::Worker.counts_by_state
+ # # => {}
+ def self.counts_by_state
+ counts = {}
+ collection.aggregate([
+ {
+ '$group' => {
+ _id: '$state',
+ count: {'$sum' => 1}
+ }
+ }
+ ]
+ ).each do |result|
+ counts[result['_id']] = result['count']
+ end
+ counts
+ end
+
# Returns [Boolean] whether the worker is shutting down
def shutting_down?
- if self.class.shutdown
- stop! if running?
- true
- else
- !running?
+ self.class.shutdown? || !running?
+ end
+
+ # Returns [true|false] if this worker has missed at least the last 4 heartbeats
+ #
+ # Possible causes for a worker to miss its heartbeats:
+ # - The worker process has died
+ # - The worker process is "hanging"
+ # - The worker is no longer able to communicate with the MongoDB Server
+ def zombie?(missed = 4)
+ return false unless running?
+ return true if heartbeat.updated_at.nil?
+ dead_seconds = Config.instance.heartbeat_seconds * missed
+ (Time.now - heartbeat.updated_at) >= dead_seconds
+ end
+
+ # On MRI the 'concurrent-ruby-ext' gem may not be loaded
+ if defined?(Concurrent::JavaAtomicBoolean) || defined?(Concurrent::CAtomicBoolean)
+ # Returns [true|false] whether the shutdown indicator has been set for this worker process
+ def self.shutdown?
+ @@shutdown.value
end
+
+ # Set shutdown indicator for this worker process
+ def self.shutdown!
+ @@shutdown.make_true
+ end
+
+ @@shutdown = Concurrent::AtomicBoolean.new(false)
+ else
+ # Returns [true|false] whether the shutdown indicator has been set for this worker process
+ def self.shutdown?
+ @@shutdown
+ end
+
+ # Set shutdown indicator for this worker process
+ def self.shutdown!
+ @@shutdown = true
+ end
+
+ @@shutdown = false
end
- # Returns [Array<Thread>] threads in the thread_pool
- def thread_pool
- @thread_pool ||= []
+ private
+
+ attr_reader :worker_threads
+
+ # Returns [Array<Thread>] collection of created worker threads
+ def worker_threads
+ @worker_threads ||= []
end
# Management Thread
def run
- Thread.current.name = 'rocketjob main'
- build_heartbeat unless heartbeat
-
- started
- adjust_thread_pool(true)
- save
+ logger.info "Using MongoDB Database: #{RocketJob::Job.database.name}"
+ build_heartbeat(updated_at: Time.now, current_threads: 0)
+ started!
+ adjust_worker_threads(true)
logger.info "RocketJob Worker started with #{max_threads} workers running"
count = 0
- loop do
- # Update heartbeat so that monitoring tools know that this worker is alive
- set(
+ while running? || paused?
+ sleep Config.instance.heartbeat_seconds
+
+ update_attributes_and_reload(
'heartbeat.updated_at' => Time.now,
- 'heartbeat.current_threads' => thread_pool_count
+ 'heartbeat.current_threads' => worker_count
)
- # Reload the worker model every few heartbeats in case its config was changed
- # TODO make 3 configurable
- if count >= 3
- reload
- adjust_thread_pool
- count = 0
- else
- count += 1
- end
+ # In case number of threads has been modified
+ adjust_worker_threads
- # Stop worker if shutdown signal was raised
- stop! if self.class.shutdown && !stopping?
-
- break if stopping?
-
- sleep Config.instance.heartbeat_seconds
+ # Stop worker if shutdown indicator was set
+ stop! if self.class.shutdown? && may_stop?
end
logger.info 'Waiting for worker threads to stop'
# TODO Put a timeout on join.
# Log Thread dump for active threads
# Compare thread dumps for any changes, force down if no change?
# reload, if model missing: Send Shutdown exception to each thread
# 5 more seconds then exit
- thread_pool.each { |t| t.join }
+ worker_threads.each { |t| t.join }
logger.info 'Shutdown'
rescue Exception => exc
logger.error('RocketJob::Worker is stopping due to an exception', exc)
- ensure
- # Destroy this worker instance
- destroy
end
- def thread_pool_count
- thread_pool.count { |i| i.alive? }
+ # Returns [Fixnum] number of workers (threads) that are alive
+ def worker_count
+ worker_threads.count { |i| i.alive? }
end
- # Returns [true|false] if this worker has missed at least the last 4 heartbeats
- #
- # Possible causes for a worker to miss its heartbeats:
- # - The worker process has died
- # - The worker process is "hanging"
- # - The worker is no longer able to communicate with the MongoDB Server
- def zombie?(missed = 4)
- return false unless running?
- dead_seconds = Config.instance.heartbeat_seconds * missed
- (Time.now - heartbeat.updated_at) >= dead_seconds
- end
-
- protected
-
def next_worker_id
@worker_id ||= 0
@worker_id += 1
end
@@ -242,27 +291,27 @@
# Whether to stagger when the threads poll for work the first time
# It spreads out the queue polling over the max_poll_seconds so
# that not all workers poll at the same time
# The worker also respond faster than max_poll_seconds when a new
# job is added.
- def adjust_thread_pool(stagger_threads=false)
- count = thread_pool_count
+ def adjust_worker_threads(stagger_threads=false)
+ count = worker_count
# Cleanup threads that have stopped
- if count != thread_pool.count
- logger.info "Cleaning up #{thread_pool.count - count} threads that went away"
- thread_pool.delete_if { |t| !t.alive? }
+ if count != worker_threads.count
+ logger.info "Cleaning up #{worker_threads.count - count} threads that went away"
+ worker_threads.delete_if { |t| !t.alive? }
end
return if shutting_down?
# Need to add more threads?
if count < max_threads
thread_count = max_threads - count
logger.info "Starting #{thread_count} threads"
thread_count.times.each do
# Start worker thread
- thread_pool << Thread.new(next_worker_id) do |id|
+ worker_threads << Thread.new(next_worker_id) do |id|
begin
sleep (Config.instance.max_poll_seconds.to_f / max_threads) * (id - 1) if stagger_threads
worker(id)
rescue Exception => exc
logger.fatal('Cannot start worker thread', exc)
@@ -272,89 +321,71 @@
end
end
# Keep processing jobs until worker stops running
def worker(worker_id)
- Thread.current.name = "rocketjob #{worker_id}"
+ Thread.current.name = 'rocketjob %03i' % worker_id
logger.info 'Started'
while !shutting_down?
- if process_next_job
- # Keeps workers staggered across the poll interval so that not
- # all workers poll at the same time
+ if process_available_jobs
+ # Keeps workers staggered across the poll interval so that
+ # all workers don't poll at the same time
sleep rand(RocketJob::Config.instance.max_poll_seconds * 1000) / 1000
else
+ break if shutting_down?
sleep RocketJob::Config.instance.max_poll_seconds
end
end
logger.info "Stopping. Worker state: #{state.inspect}"
rescue Exception => exc
logger.fatal('Unhandled exception in job processing thread', exc)
end
# Process the next available job
# Returns [Boolean] whether any job was actually processed
- def process_next_job
+ def process_available_jobs
skip_job_ids = []
- while job = Job.next_job(name, skip_job_ids)
- logger.tagged("Job #{job.id}") do
- if job.work(self)
- return true if shutting_down?
+ processed = false
+ while (job = Job.rocket_job_next_job(name, skip_job_ids)) && !shutting_down?
+ logger.fast_tag("Job #{job.id}") do
+ if job.rocket_job_work(self)
# Need to skip the specified job due to throttling or no work available
skip_job_ids << job.id
else
- return true
+ processed = true
end
end
end
- false
+ processed
end
- # Requeue any jobs assigned to this worker
- def requeue_jobs
- stop! if running? || paused?
- RocketJob::Job.requeue_dead_worker(name)
- end
-
- # Shutdown indicator
- def self.shutdown
- @@shutdown
- end
-
- @@shutdown = false
-
# Register handlers for the various signals
# Term:
# Perform clean shutdown
#
def self.register_signal_handlers
begin
Signal.trap 'SIGTERM' do
- # Cannot use Mutex protected writer here since it is in a signal handler
- @@shutdown = true
- logger.warn 'Shutdown signal (SIGTERM) received. Will shutdown as soon as active jobs/slices have completed.'
+ shutdown!
+ message = 'Shutdown signal (SIGTERM) received. Will shutdown as soon as active jobs/slices have completed.'
+ # Logging uses a mutex to access Queue on MRI/CRuby
+ defined?(JRuby) ? logger.warn(message) : puts(message)
end
Signal.trap 'INT' do
- # Cannot use Mutex protected writer here since it is in a signal handler
- @@shutdown = true
- logger.warn 'Shutdown signal (INT) received. Will shutdown as soon as active jobs/slices have completed.'
+ shutdown!
+ message = 'Shutdown signal (INT) received. Will shutdown as soon as active jobs/slices have completed.'
+ # Logging uses a mutex to access Queue on MRI/CRuby
+ defined?(JRuby) ? logger.warn(message) : puts(message)
end
rescue StandardError
logger.warn 'SIGTERM handler not installed. Not able to shutdown gracefully'
end
end
- # Patch the way MongoMapper reloads a model
- def reload
- if doc = collection.find_one(:_id => id)
- # Clear out keys that are not returned during the reload from MongoDB
- (keys.keys - doc.keys).each { |key| send("#{key}=", nil) }
- initialize_default_values
- load_from_database(doc)
- self
- else
- raise MongoMapper::DocumentNotFound, "Document match #{_id.inspect} does not exist in #{collection.name} collection"
- end
+ # Requeue any jobs assigned to this worker when it is destroyed
+ def requeue_jobs
+ RocketJob::Job.requeue_dead_worker(name)
end
end
end