lib/rocket_job/worker.rb in rocketjob-2.0.0 vs lib/rocket_job/worker.rb in rocketjob-2.1.1
- old
+ new
@@ -176,11 +176,11 @@
count: {'$sum' => 1}
}
}
]
).each do |result|
- counts[result['_id']] = result['count']
+ counts[result['_id'].to_sym] = result['count']
end
counts
end
# Returns [Boolean] whether the worker is shutting down
@@ -193,11 +193,11 @@
# Possible causes for a worker to miss its heartbeats:
# - The worker process has died
# - The worker process is "hanging"
# - The worker is no longer able to communicate with the MongoDB Server
def zombie?(missed = 4)
- return false unless running?
+ return false unless running? || stopping?
return true if heartbeat.nil? || heartbeat.updated_at.nil?
dead_seconds = Config.instance.heartbeat_seconds * missed
(Time.now - heartbeat.updated_at) >= dead_seconds
end
@@ -258,17 +258,29 @@
adjust_worker_threads
# Stop worker if shutdown indicator was set
stop! if self.class.shutdown? && may_stop?
end
+
logger.info 'Waiting for worker threads to stop'
- # TODO Put a timeout on join.
- # Log Thread dump for active threads
- # Compare thread dumps for any changes, force down if no change?
- # reload, if model missing: Send Shutdown exception to each thread
- # 5 more seconds then exit
- worker_threads.each { |t| t.join }
+ while thread = worker_threads.first
+ if thread.join(5)
+ # Worker thread is dead
+ worker_threads.shift
+ else
+ # Timeout waiting for thread to stop
+ begin
+ update_attributes_and_reload(
+ 'heartbeat.updated_at' => Time.now,
+ 'heartbeat.current_threads' => worker_count
+ )
+ rescue MongoMapper::DocumentNotFound
+ logger.error('Worker has been destroyed. Going down hard!')
+ break
+ end
+ end
+ end
logger.info 'Shutdown'
rescue Exception => exc
logger.error('RocketJob::Worker is stopping due to an exception', exc)
end
@@ -334,9 +346,11 @@
end
end
logger.info "Stopping. Worker state: #{state.inspect}"
rescue Exception => exc
logger.fatal('Unhandled exception in job processing thread', exc)
+ ensure
+ ActiveRecord::Base.clear_active_connections! if defined?(ActiveRecord::Base)
end
# Process the next available job
# Returns [Boolean] whether any job was actually processed
def process_available_jobs