lib/rocket_job/worker.rb in rocketjob-1.0.0 vs lib/rocket_job/worker.rb in rocketjob-1.1.0
- old
+ new
@@ -34,10 +34,11 @@
include SemanticLogger::Loggable
# Prevent data in MongoDB from re-defining the model behavior
#self.static_keys = true
+ # @formatter:off
# Unique Name of this worker instance
# Defaults to the `hostname` but _must_ be overriden if mutiple Worker instances
# are started on the same host
# The unique name is used on re-start to re-queue any jobs that were being processed
# at the time the worker or host unexpectedly terminated, if any
@@ -84,10 +85,11 @@
transitions from: :running, to: :stopping
transitions from: :paused, to: :stopping
transitions from: :starting, to: :stopping
end
end
+ # @formatter:on
attr_reader :thread_pool
# Requeue any jobs being worked by this worker when it is destroyed
before_destroy :requeue_jobs
@@ -98,11 +100,11 @@
worker = new(attrs)
worker.build_heartbeat
worker.save!
create_indexes
register_signal_handlers
- raise "The RocketJob configuration is being applied after the system has been initialized" unless RocketJob::Job.database.name == RocketJob::SlicedJob.database.name
+ raise 'The RocketJob configuration is being applied after the system has been initialized' unless RocketJob::Job.database.name == RocketJob::SlicedJob.database.name
logger.info "Using MongoDB Database: #{RocketJob::Job.database.name}"
worker.run
end
# Create indexes
@@ -110,43 +112,40 @@
ensure_index [[:name, 1]], background: true, unique: true
# Also create indexes for the jobs collection
Job.create_indexes
end
- # Destroy dead workers ( missed at least the last 4 heartbeats )
- # Requeue jobs assigned to dead workers
- # Destroy dead workers
- def self.destroy_dead_workers
- dead_seconds = Config.instance.heartbeat_seconds * 4
+ # Destroy's all instances of zombie workers and requeues any jobs still "running"
+ # on those workers
+ def self.destroy_zombies
each do |worker|
- next if (Time.now - worker.heartbeat.updated_at) < dead_seconds
- logger.warn "Destroying worker #{worker.name}, and requeueing its jobs"
+ next unless zombie?
+ logger.warn "Destroying zombie worker #{worker.name}, and requeueing its jobs"
worker.destroy
end
end
+ def self.destroy_dead_workers
+ warn 'RocketJob::Worker.destroy_dead_workers is deprecated, use RocketJob::Worker.destroy_zombies'
+ destroy_zombies
+ end
+
# Stop all running, paused, or starting workers
def self.stop_all
- where(state: ['running', 'paused', 'starting']).each { |worker| worker.stop! }
+ where(state: [:running, :paused, :starting]).each(&:stop!)
end
# Pause all running workers
def self.pause_all
- where(state: 'running').each { |worker| worker.pause! }
+ running.each(&:pause!)
end
# Resume all paused workers
def self.resume_all
- each { |worker| worker.resume! if worker.paused? }
+ paused.each(&:resume!)
end
- # Register a handler to perform cleanups etc. whenever a worker is
- # explicitly destroyed
- def self.register_destroy_handler(&block)
- @@destroy_handlers << block
- end
-
# Returns [Boolean] whether the worker is shutting down
def shutting_down?
if self.class.shutdown
stop! if running?
true
@@ -160,11 +159,11 @@
@thread_pool ||= []
end
# Run this instance of the worker
def run
- Thread.current.name = 'RocketJob main'
+ Thread.current.name = 'rocketjob main'
build_heartbeat unless heartbeat
started
adjust_thread_pool(true)
save
@@ -209,13 +208,24 @@
# Destroy this worker instance
destroy
end
def thread_pool_count
- thread_pool.count{ |i| i.alive? }
+ thread_pool.count { |i| i.alive? }
end
+ # Returns [true|false] if this worker has missed at least the last 4 heartbeats
+ #
+ # Possible causes for a worker to miss its heartbeats:
+ # - The worker process has died
+ # - The worker process is "hanging"
+ # - The worker is no longer able to communicate with the MongoDB Server
+ def zombie?(missed = 4)
+ dead_seconds = Config.instance.heartbeat_seconds * missed
+ (Time.now - worker.heartbeat.updated_at) >= dead_seconds
+ end
+
protected
def next_worker_id
@worker_id ||= 0
@worker_id += 1
@@ -277,11 +287,11 @@
end
# Process the next available job
# Returns [Boolean] whether any job was actually processed
def process_next_job
- skip_job_ids = []
+ skip_job_ids = []
while job = Job.next_job(name, skip_job_ids)
logger.tagged("Job #{job.id}") do
if job.work(self)
return true if shutting_down?
# Need to skip the specified job due to throttling or no work available
@@ -295,11 +305,11 @@
end
# Requeue any jobs assigned to this worker
def requeue_jobs
stop! if running? || paused?
- @@destroy_handlers.each { |handler| handler.call(name) }
+ RocketJob::Job.requeue_dead_worker(name)
end
# Mutex protected shutdown indicator
sync_cattr_accessor :shutdown do
false
@@ -309,38 +319,37 @@
# Term:
# Perform clean shutdown
#
def self.register_signal_handlers
begin
- Signal.trap "SIGTERM" do
+ Signal.trap 'SIGTERM' do
# Cannot use Mutex protected writer here since it is in a signal handler
@@shutdown = true
- logger.warn "Shutdown signal (SIGTERM) received. Will shutdown as soon as active jobs/slices have completed."
+ logger.warn 'Shutdown signal (SIGTERM) received. Will shutdown as soon as active jobs/slices have completed.'
end
- Signal.trap "INT" do
+ Signal.trap 'INT' do
# Cannot use Mutex protected writer here since it is in a signal handler
@@shutdown = true
- logger.warn "Shutdown signal (INT) received. Will shutdown as soon as active jobs/slices have completed."
+ logger.warn 'Shutdown signal (INT) received. Will shutdown as soon as active jobs/slices have completed.'
end
- rescue Exception
- logger.warn "SIGTERM handler not installed. Not able to shutdown gracefully"
+ rescue StandardError
+ logger.warn 'SIGTERM handler not installed. Not able to shutdown gracefully'
end
end
# Patch the way MongoMapper reloads a model
def reload
if doc = collection.find_one(:_id => id)
+ # Clear out keys that are not returned during the reload from MongoDB
+ (keys.keys - doc.keys).each { |key| send("#{key}=", nil) }
+ initialize_default_values
load_from_database(doc)
self
else
raise MongoMapper::DocumentNotFound, "Document match #{_id.inspect} does not exist in #{collection.name} collection"
end
end
-
- private
-
- @@destroy_handlers = ThreadSafe::Array.new
end
end