lib/rocket_job/server.rb in rocketjob-3.0.0.beta2 vs lib/rocket_job/server.rb in rocketjob-3.0.0.beta3

- old
+ new

@@ -28,10 +28,12 @@ class Server include Plugins::Document include Plugins::StateMachine include SemanticLogger::Loggable + store_in collection: 'rocket_job.servers' + # Unique Name of this server instance # Default: `host name:PID` # The unique name is used on re-start to re-queue any jobs that were being processed # at the time the server unexpectedly terminated, if any field :name, type: String, default: -> { "#{SemanticLogger.host}:#{$$}" } @@ -41,10 +43,13 @@ field :max_workers, type: Integer, default: -> { Config.instance.max_workers } # When this server process was started field :started_at, type: Time + # Filter to apply to control which job classes this server can process + field :filter, type: Hash + # The heartbeat information for this server embeds_one :heartbeat, class_name: 'RocketJob::Heartbeat' # Current state # Internal use only. Do not set this field directly @@ -186,11 +191,10 @@ Mongoid::Tasks::Database.create_indexes register_signal_handlers server = create!(attrs) server.send(:run) - ensure server.destroy if server end # Returns [Boolean] whether the server is shutting down @@ -223,26 +227,35 @@ # Management Thread def run logger.info "Using MongoDB Database: #{RocketJob::Job.collection.database.name}" build_heartbeat(updated_at: Time.now, workers: 0) started! - adjust_workers(true) - logger.info "RocketJob Server started with #{workers.size} workers running" + logger.info 'RocketJob Server started' + stagger = true while running? || paused? - sleep Config.instance.heartbeat_seconds + SemanticLogger.silence(:info) do + find_and_update( + 'heartbeat.updated_at' => Time.now, + 'heartbeat.workers' => worker_count + ) + end + if paused? + workers.each(&:shutdown!) + stagger = true + end - find_and_update( - 'heartbeat.updated_at' => Time.now, - 'heartbeat.workers' => worker_count - ) - # In case number of threads has been modified - adjust_workers + adjust_workers(stagger) + stagger = false # Stop server if shutdown indicator was set - stop! if self.class.shutdown? && may_stop? + if self.class.shutdown? && may_stop? + stop! + else + sleep Config.instance.heartbeat_seconds + end end logger.info 'Waiting for workers to stop' # Tell each worker to shutdown cleanly workers.each(&:shutdown!) @@ -291,27 +304,29 @@ # Whether to stagger when the workers poll for work the first time # It spreads out the queue polling over the max_poll_seconds so # that not all workers poll at the same time # The worker also respond faster than max_poll_seconds when a new # job is added. - def adjust_workers(stagger_workers=false) + def adjust_workers(stagger_workers = false) count = worker_count # Cleanup workers that have stopped if count != workers.count logger.info "Cleaning up #{workers.count - count} workers that went away" workers.delete_if { |t| !t.alive? } end + return unless running? + # Need to add more workers? if count < max_workers worker_count = max_workers - count logger.info "Starting #{worker_count} workers" worker_count.times.each do sleep (Config.instance.max_poll_seconds.to_f / max_workers) * (next_worker_id - 1) if stagger_workers return if shutdown? # Start worker begin - workers << Worker.new(id: next_worker_id, server_name: name) + workers << Worker.new(id: next_worker_id, server_name: name, filter: filter) rescue Exception => exc logger.fatal('Cannot start worker', exc) end end end