lib/rocket_job/supervisor.rb in rocketjob-5.1.1 vs lib/rocket_job/supervisor.rb in rocketjob-5.2.0.beta1
- old
+ new
@@ -1,6 +1,6 @@
-require 'rocket_job/supervisor/shutdown'
+require "rocket_job/supervisor/shutdown"
module RocketJob
# Starts a server instance, along with the workers and ensures workers remain running until they need to shutdown.
class Supervisor
include SemanticLogger::Loggable
@@ -9,11 +9,11 @@
attr_reader :server, :worker_pool
attr_accessor :worker_id
# Start the Supervisor, using the supplied attributes to create a new Server instance.
def self.run
- Thread.current.name = 'rocketjob main'
+ Thread.current.name = "rocketjob main"
RocketJob.create_indexes
register_signal_handlers
server = Server.create!
new(server).run
@@ -27,13 +27,13 @@
@mutex = Mutex.new
end
def run
logger.info "Using MongoDB Database: #{RocketJob::Job.collection.database.name}"
- logger.info('Running with filter', Config.filter) if Config.filter
+ logger.info("Running with filter", Config.filter) if Config.filter
server.started!
- logger.info 'Rocket Job Server started'
+ logger.info "Rocket Job Server started"
event_listener = Thread.new { Event.listener }
Subscribers::Server.subscribe(self) do
Subscribers::Worker.subscribe(self) do
Subscribers::Logger.subscribe do
@@ -41,32 +41,32 @@
stop!
end
end
end
rescue ::Mongoid::Errors::DocumentNotFound
- logger.info('Server has been destroyed. Going down hard!')
- rescue Exception => exc
- logger.error('RocketJob::Server is stopping due to an exception', exc)
+ logger.info("Server has been destroyed. Going down hard!")
+ rescue Exception => e
+ logger.error("RocketJob::Server is stopping due to an exception", e)
ensure
- event_listener.kill if event_listener
+ event_listener&.kill
# Logs the backtrace for each running worker
worker_pool.log_backtraces
- logger.info('Shutdown Complete')
+ logger.info("Shutdown Complete")
end
def stop!
server.stop! if server.may_stop?
worker_pool.stop
- while !worker_pool.join
- logger.info 'Waiting for workers to finish processing ...'
+ until worker_pool.join
+ logger.info "Waiting for workers to finish processing ..."
# One or more workers still running so update heartbeat so that server reports "alive".
server.refresh(worker_pool.living_count)
end
end
def supervise_pool
stagger = true
- while !self.class.shutdown?
+ until self.class.shutdown?
synchronize do
if server.running?
worker_pool.prune
worker_pool.rebalance(server.max_workers, stagger)
stagger = false