lib/rocket_job/worker.rb in rocketjob-2.0.0 vs lib/rocket_job/worker.rb in rocketjob-2.1.1

- old
+ new

@@ -176,11 +176,11 @@ count: {'$sum' => 1} } } ] ).each do |result| - counts[result['_id']] = result['count'] + counts[result['_id'].to_sym] = result['count'] end counts end # Returns [Boolean] whether the worker is shutting down @@ -193,11 +193,11 @@ # Possible causes for a worker to miss its heartbeats: # - The worker process has died # - The worker process is "hanging" # - The worker is no longer able to communicate with the MongoDB Server def zombie?(missed = 4) - return false unless running? + return false unless running? || stopping? return true if heartbeat.nil? || heartbeat.updated_at.nil? dead_seconds = Config.instance.heartbeat_seconds * missed (Time.now - heartbeat.updated_at) >= dead_seconds end @@ -258,17 +258,29 @@ adjust_worker_threads # Stop worker if shutdown indicator was set stop! if self.class.shutdown? && may_stop? end + logger.info 'Waiting for worker threads to stop' - # TODO Put a timeout on join. - # Log Thread dump for active threads - # Compare thread dumps for any changes, force down if no change? - # reload, if model missing: Send Shutdown exception to each thread - # 5 more seconds then exit - worker_threads.each { |t| t.join } + while thread = worker_threads.first + if thread.join(5) + # Worker thread is dead + worker_threads.shift + else + # Timeout waiting for thread to stop + begin + update_attributes_and_reload( + 'heartbeat.updated_at' => Time.now, + 'heartbeat.current_threads' => worker_count + ) + rescue MongoMapper::DocumentNotFound + logger.error('Worker has been destroyed. Going down hard!') + break + end + end + end logger.info 'Shutdown' rescue Exception => exc logger.error('RocketJob::Worker is stopping due to an exception', exc) end @@ -334,9 +346,11 @@ end end logger.info "Stopping. Worker state: #{state.inspect}" rescue Exception => exc logger.fatal('Unhandled exception in job processing thread', exc) + ensure + ActiveRecord::Base.clear_active_connections! if defined?(ActiveRecord::Base) end # Process the next available job # Returns [Boolean] whether any job was actually processed def process_available_jobs