lib/rocket_job/server.rb in rocketjob-3.4.3 vs lib/rocket_job/server.rb in rocketjob-3.5.0

- old
+ new

@@ -105,11 +105,11 @@ count end # Stop all running, paused, or starting servers def self.stop_all - where(:state.in => [:running, :paused, :starting]).each(&:stop!) + where(:state.in => %i[running paused starting]).each(&:stop!) end # Pause all running servers def self.pause_all running.each(&:pause!) @@ -192,11 +192,11 @@ register_signal_handlers server = create!(attrs) server.send(:run) ensure - server.destroy if server + server&.destroy end # Returns [Boolean] whether the server is shutting down def shutdown? self.class.shutdown? || !running? @@ -205,14 +205,14 @@ # Scope for all zombie servers def self.zombies(missed = 4) dead_seconds = Config.instance.heartbeat_seconds * missed last_heartbeat_time = Time.now - dead_seconds where( - :state.in => [:stopping, :running, :paused], + :state.in => %i[stopping running paused], '$or' => [ - {"heartbeat.updated_at" => {'$exists' => false}}, - {"heartbeat.updated_at" => {'$lte' => last_heartbeat_time}} + {'heartbeat.updated_at' => {'$exists' => false}}, + {'heartbeat.updated_at' => {'$lte' => last_heartbeat_time}} ] ) end # Returns [true|false] if this server has missed at least the last 4 heartbeats @@ -228,31 +228,29 @@ (Time.now - heartbeat.updated_at) >= dead_seconds end private - attr_reader :workers - # Returns [Array<Worker>] collection of workers def workers @workers ||= [] end # Management Thread def run logger.info "Using MongoDB Database: #{RocketJob::Job.collection.database.name}" build_heartbeat(updated_at: Time.now, workers: 0) started! - logger.info 'RocketJob Server started' + logger.info 'Rocket Job Server started' run_workers logger.info 'Waiting for workers to stop' # Tell each worker to shutdown cleanly workers.each(&:shutdown!) - while worker = workers.first + while (worker = workers.first) if worker.join(5) # Worker thread is dead workers.shift else # Timeout waiting for worker to stop @@ -268,13 +266,11 @@ logger.warn('Server has been destroyed. Going down hard!') rescue Exception => exc logger.error('RocketJob::Server is stopping due to an exception', exc) ensure # Logs the backtrace for each running worker - if SemanticLogger::VERSION.to_i >= 4 - workers.each { |worker| logger.backtrace(thread: worker.thread) if worker.thread && worker.alive? } - end + workers.each { |worker| logger.backtrace(thread: worker.thread) if worker.thread && worker.alive? } end def run_workers stagger = true while running? || paused? @@ -330,53 +326,51 @@ end return unless running? # Need to add more workers? - if count < max_workers - worker_count = max_workers - count - logger.info "Starting #{worker_count} workers" - worker_count.times.each do - sleep (Config.instance.max_poll_seconds.to_f / max_workers) if stagger_workers - return if shutdown? - # Start worker - begin - workers << Worker.new(id: next_worker_id, server_name: name, filter: filter) - rescue Exception => exc - logger.fatal('Cannot start worker', exc) - end + return unless count < max_workers + + worker_count = max_workers - count + logger.info "Starting #{worker_count} workers" + worker_count.times.each do + sleep(Config.instance.max_poll_seconds.to_f / max_workers) if stagger_workers + return if shutdown? + # Start worker + begin + workers << Worker.new(id: next_worker_id, server_name: name, filter: filter) + rescue Exception => exc + logger.fatal('Cannot start worker', exc) end end end # Register handlers for the various signals # Term: # Perform clean shutdown # def self.register_signal_handlers - begin - Signal.trap 'SIGTERM' do - shutdown! - message = 'Shutdown signal (SIGTERM) received. Will shutdown as soon as active jobs/slices have completed.' - # Logging uses a mutex to access Queue on MRI/CRuby - defined?(JRuby) ? logger.warn(message) : puts(message) - end + Signal.trap 'SIGTERM' do + shutdown! + message = 'Shutdown signal (SIGTERM) received. Will shutdown as soon as active jobs/slices have completed.' + # Logging uses a mutex to access Queue on MRI/CRuby + defined?(JRuby) ? logger.warn(message) : puts(message) + end - Signal.trap 'INT' do - shutdown! - message = 'Shutdown signal (INT) received. Will shutdown as soon as active jobs/slices have completed.' - # Logging uses a mutex to access Queue on MRI/CRuby - defined?(JRuby) ? logger.warn(message) : puts(message) - end - rescue StandardError - logger.warn 'SIGTERM handler not installed. Not able to shutdown gracefully' + Signal.trap 'INT' do + shutdown! + message = 'Shutdown signal (INT) received. Will shutdown as soon as active jobs/slices have completed.' + # Logging uses a mutex to access Queue on MRI/CRuby + defined?(JRuby) ? logger.warn(message) : puts(message) end + rescue StandardError + logger.warn 'SIGTERM handler not installed. Not able to shutdown gracefully' end + private_class_method :register_signal_handlers + # Requeue any jobs assigned to this server when it is destroyed def requeue_jobs RocketJob::Job.requeue_dead_server(name) end - end end -