lib/rocket_job/supervisor.rb in rocketjob-5.1.1 vs lib/rocket_job/supervisor.rb in rocketjob-5.2.0.beta1

- old
+ new

@@ -1,6 +1,6 @@ -require 'rocket_job/supervisor/shutdown' +require "rocket_job/supervisor/shutdown" module RocketJob # Starts a server instance, along with the workers and ensures workers remain running until they need to shutdown. class Supervisor include SemanticLogger::Loggable @@ -9,11 +9,11 @@ attr_reader :server, :worker_pool attr_accessor :worker_id # Start the Supervisor, using the supplied attributes to create a new Server instance. def self.run - Thread.current.name = 'rocketjob main' + Thread.current.name = "rocketjob main" RocketJob.create_indexes register_signal_handlers server = Server.create! new(server).run @@ -27,13 +27,13 @@ @mutex = Mutex.new end def run logger.info "Using MongoDB Database: #{RocketJob::Job.collection.database.name}" - logger.info('Running with filter', Config.filter) if Config.filter + logger.info("Running with filter", Config.filter) if Config.filter server.started! - logger.info 'Rocket Job Server started' + logger.info "Rocket Job Server started" event_listener = Thread.new { Event.listener } Subscribers::Server.subscribe(self) do Subscribers::Worker.subscribe(self) do Subscribers::Logger.subscribe do @@ -41,32 +41,32 @@ stop! end end end rescue ::Mongoid::Errors::DocumentNotFound - logger.info('Server has been destroyed. Going down hard!') - rescue Exception => exc - logger.error('RocketJob::Server is stopping due to an exception', exc) + logger.info("Server has been destroyed. Going down hard!") + rescue Exception => e + logger.error("RocketJob::Server is stopping due to an exception", e) ensure - event_listener.kill if event_listener + event_listener&.kill # Logs the backtrace for each running worker worker_pool.log_backtraces - logger.info('Shutdown Complete') + logger.info("Shutdown Complete") end def stop! server.stop! if server.may_stop? worker_pool.stop - while !worker_pool.join - logger.info 'Waiting for workers to finish processing ...' + until worker_pool.join + logger.info "Waiting for workers to finish processing ..." # One or more workers still running so update heartbeat so that server reports "alive". server.refresh(worker_pool.living_count) end end def supervise_pool stagger = true - while !self.class.shutdown? + until self.class.shutdown? synchronize do if server.running? worker_pool.prune worker_pool.rebalance(server.max_workers, stagger) stagger = false