lib/rocket_job/server.rb in rocketjob-3.0.0.rc1 vs lib/rocket_job/server.rb in rocketjob-3.0.0.rc2
- old
+ new
@@ -229,35 +229,12 @@
logger.info "Using MongoDB Database: #{RocketJob::Job.collection.database.name}"
build_heartbeat(updated_at: Time.now, workers: 0)
started!
logger.info 'RocketJob Server started'
- stagger = true
- while running? || paused?
- SemanticLogger.silence(:info) do
- find_and_update(
- 'heartbeat.updated_at' => Time.now,
- 'heartbeat.workers' => worker_count
- )
- end
- if paused?
- workers.each(&:shutdown!)
- stagger = true
- end
+ run_workers
- # In case number of threads has been modified
- adjust_workers(stagger)
- stagger = false
-
- # Stop server if shutdown indicator was set
- if self.class.shutdown? && may_stop?
- stop!
- else
- sleep Config.instance.heartbeat_seconds
- end
- end
-
logger.info 'Waiting for workers to stop'
# Tell each worker to shutdown cleanly
workers.each(&:shutdown!)
while worker = workers.first
@@ -283,10 +260,37 @@
if SemanticLogger::VERSION.to_i >= 4
workers.each { |thread| logger.backtrace(thread: thread) }
end
end
+ def run_workers
+ stagger = true
+ while running? || paused?
+ SemanticLogger.silence(:info) do
+ find_and_update(
+ 'heartbeat.updated_at' => Time.now,
+ 'heartbeat.workers' => worker_count
+ )
+ end
+ if paused?
+ workers.each(&:shutdown!)
+ stagger = true
+ end
+
+ # In case number of threads has been modified
+ adjust_workers(stagger)
+ stagger = false
+
+ # Stop server if shutdown indicator was set
+ if self.class.shutdown? && may_stop?
+ stop!
+ else
+ sleep Config.instance.heartbeat_seconds
+ end
+ end
+ end
+
# Returns [Fixnum] number of workers (threads) that are alive
def worker_count
workers.count(&:alive?)
end
@@ -317,10 +321,10 @@
# Need to add more workers?
if count < max_workers
worker_count = max_workers - count
logger.info "Starting #{worker_count} workers"
worker_count.times.each do
- sleep (Config.instance.max_poll_seconds.to_f / max_workers) * (next_worker_id - 1) if stagger_workers
+ sleep (Config.instance.max_poll_seconds.to_f / max_workers) if stagger_workers
return if shutdown?
# Start worker
begin
workers << Worker.new(id: next_worker_id, server_name: name, filter: filter)
rescue Exception => exc