lib/rocket_job/server.rb in rocketjob-3.0.0.rc1 vs lib/rocket_job/server.rb in rocketjob-3.0.0.rc2

- old
+ new

@@ -229,35 +229,12 @@ logger.info "Using MongoDB Database: #{RocketJob::Job.collection.database.name}" build_heartbeat(updated_at: Time.now, workers: 0) started! logger.info 'RocketJob Server started' - stagger = true - while running? || paused? - SemanticLogger.silence(:info) do - find_and_update( - 'heartbeat.updated_at' => Time.now, - 'heartbeat.workers' => worker_count - ) - end - if paused? - workers.each(&:shutdown!) - stagger = true - end + run_workers - # In case number of threads has been modified - adjust_workers(stagger) - stagger = false - - # Stop server if shutdown indicator was set - if self.class.shutdown? && may_stop? - stop! - else - sleep Config.instance.heartbeat_seconds - end - end - logger.info 'Waiting for workers to stop' # Tell each worker to shutdown cleanly workers.each(&:shutdown!) while worker = workers.first @@ -283,10 +260,37 @@ if SemanticLogger::VERSION.to_i >= 4 workers.each { |thread| logger.backtrace(thread: thread) } end end + def run_workers + stagger = true + while running? || paused? + SemanticLogger.silence(:info) do + find_and_update( + 'heartbeat.updated_at' => Time.now, + 'heartbeat.workers' => worker_count + ) + end + if paused? + workers.each(&:shutdown!) + stagger = true + end + + # In case number of threads has been modified + adjust_workers(stagger) + stagger = false + + # Stop server if shutdown indicator was set + if self.class.shutdown? && may_stop? + stop! + else + sleep Config.instance.heartbeat_seconds + end + end + end + # Returns [Fixnum] number of workers (threads) that are alive def worker_count workers.count(&:alive?) end @@ -317,10 +321,10 @@ # Need to add more workers? if count < max_workers worker_count = max_workers - count logger.info "Starting #{worker_count} workers" worker_count.times.each do - sleep (Config.instance.max_poll_seconds.to_f / max_workers) * (next_worker_id - 1) if stagger_workers + sleep (Config.instance.max_poll_seconds.to_f / max_workers) if stagger_workers return if shutdown? # Start worker begin workers << Worker.new(id: next_worker_id, server_name: name, filter: filter) rescue Exception => exc