lib/rocket_job/server.rb in rocketjob-3.4.3 vs lib/rocket_job/server.rb in rocketjob-3.5.0
- old
+ new
@@ -105,11 +105,11 @@
count
end
# Stop all running, paused, or starting servers
def self.stop_all
- where(:state.in => [:running, :paused, :starting]).each(&:stop!)
+ where(:state.in => %i[running paused starting]).each(&:stop!)
end
# Pause all running servers
def self.pause_all
running.each(&:pause!)
@@ -192,11 +192,11 @@
register_signal_handlers
server = create!(attrs)
server.send(:run)
ensure
- server.destroy if server
+ server&.destroy
end
# Returns [Boolean] whether the server is shutting down
def shutdown?
self.class.shutdown? || !running?
@@ -205,14 +205,14 @@
# Scope for all zombie servers
def self.zombies(missed = 4)
dead_seconds = Config.instance.heartbeat_seconds * missed
last_heartbeat_time = Time.now - dead_seconds
where(
- :state.in => [:stopping, :running, :paused],
+ :state.in => %i[stopping running paused],
'$or' => [
- {"heartbeat.updated_at" => {'$exists' => false}},
- {"heartbeat.updated_at" => {'$lte' => last_heartbeat_time}}
+ {'heartbeat.updated_at' => {'$exists' => false}},
+ {'heartbeat.updated_at' => {'$lte' => last_heartbeat_time}}
]
)
end
# Returns [true|false] if this server has missed at least the last 4 heartbeats
@@ -228,31 +228,29 @@
(Time.now - heartbeat.updated_at) >= dead_seconds
end
private
- attr_reader :workers
-
# Returns [Array<Worker>] collection of workers
def workers
@workers ||= []
end
# Management Thread
def run
logger.info "Using MongoDB Database: #{RocketJob::Job.collection.database.name}"
build_heartbeat(updated_at: Time.now, workers: 0)
started!
- logger.info 'RocketJob Server started'
+ logger.info 'Rocket Job Server started'
run_workers
logger.info 'Waiting for workers to stop'
# Tell each worker to shutdown cleanly
workers.each(&:shutdown!)
- while worker = workers.first
+ while (worker = workers.first)
if worker.join(5)
# Worker thread is dead
workers.shift
else
# Timeout waiting for worker to stop
@@ -268,13 +266,11 @@
logger.warn('Server has been destroyed. Going down hard!')
rescue Exception => exc
logger.error('RocketJob::Server is stopping due to an exception', exc)
ensure
# Logs the backtrace for each running worker
- if SemanticLogger::VERSION.to_i >= 4
- workers.each { |worker| logger.backtrace(thread: worker.thread) if worker.thread && worker.alive? }
- end
+ workers.each { |worker| logger.backtrace(thread: worker.thread) if worker.thread && worker.alive? }
end
def run_workers
stagger = true
while running? || paused?
@@ -330,53 +326,51 @@
end
return unless running?
# Need to add more workers?
- if count < max_workers
- worker_count = max_workers - count
- logger.info "Starting #{worker_count} workers"
- worker_count.times.each do
- sleep (Config.instance.max_poll_seconds.to_f / max_workers) if stagger_workers
- return if shutdown?
- # Start worker
- begin
- workers << Worker.new(id: next_worker_id, server_name: name, filter: filter)
- rescue Exception => exc
- logger.fatal('Cannot start worker', exc)
- end
+ return unless count < max_workers
+
+ worker_count = max_workers - count
+ logger.info "Starting #{worker_count} workers"
+ worker_count.times.each do
+ sleep(Config.instance.max_poll_seconds.to_f / max_workers) if stagger_workers
+ return if shutdown?
+ # Start worker
+ begin
+ workers << Worker.new(id: next_worker_id, server_name: name, filter: filter)
+ rescue Exception => exc
+ logger.fatal('Cannot start worker', exc)
end
end
end
# Register handlers for the various signals
# Term:
# Perform clean shutdown
#
def self.register_signal_handlers
- begin
- Signal.trap 'SIGTERM' do
- shutdown!
- message = 'Shutdown signal (SIGTERM) received. Will shutdown as soon as active jobs/slices have completed.'
- # Logging uses a mutex to access Queue on MRI/CRuby
- defined?(JRuby) ? logger.warn(message) : puts(message)
- end
+ Signal.trap 'SIGTERM' do
+ shutdown!
+ message = 'Shutdown signal (SIGTERM) received. Will shutdown as soon as active jobs/slices have completed.'
+ # Logging uses a mutex to access Queue on MRI/CRuby
+ defined?(JRuby) ? logger.warn(message) : puts(message)
+ end
- Signal.trap 'INT' do
- shutdown!
- message = 'Shutdown signal (INT) received. Will shutdown as soon as active jobs/slices have completed.'
- # Logging uses a mutex to access Queue on MRI/CRuby
- defined?(JRuby) ? logger.warn(message) : puts(message)
- end
- rescue StandardError
- logger.warn 'SIGTERM handler not installed. Not able to shutdown gracefully'
+ Signal.trap 'INT' do
+ shutdown!
+ message = 'Shutdown signal (INT) received. Will shutdown as soon as active jobs/slices have completed.'
+ # Logging uses a mutex to access Queue on MRI/CRuby
+ defined?(JRuby) ? logger.warn(message) : puts(message)
end
+ rescue StandardError
+ logger.warn 'SIGTERM handler not installed. Not able to shutdown gracefully'
end
+ private_class_method :register_signal_handlers
+
# Requeue any jobs assigned to this server when it is destroyed
def requeue_jobs
RocketJob::Job.requeue_dead_server(name)
end
-
end
end
-