lib/rocket_job/server.rb in rocketjob-3.0.0.beta2 vs lib/rocket_job/server.rb in rocketjob-3.0.0.beta3
- old
+ new
@@ -28,10 +28,12 @@
class Server
include Plugins::Document
include Plugins::StateMachine
include SemanticLogger::Loggable
+ store_in collection: 'rocket_job.servers'
+
# Unique Name of this server instance
# Default: `host name:PID`
# The unique name is used on re-start to re-queue any jobs that were being processed
# at the time the server unexpectedly terminated, if any
field :name, type: String, default: -> { "#{SemanticLogger.host}:#{$$}" }
@@ -41,10 +43,13 @@
field :max_workers, type: Integer, default: -> { Config.instance.max_workers }
# When this server process was started
field :started_at, type: Time
+ # Filter to apply to control which job classes this server can process
+ field :filter, type: Hash
+
# The heartbeat information for this server
embeds_one :heartbeat, class_name: 'RocketJob::Heartbeat'
# Current state
# Internal use only. Do not set this field directly
@@ -186,11 +191,10 @@
Mongoid::Tasks::Database.create_indexes
register_signal_handlers
server = create!(attrs)
server.send(:run)
-
ensure
server.destroy if server
end
# Returns [Boolean] whether the server is shutting down
@@ -223,26 +227,35 @@
# Management Thread
def run
logger.info "Using MongoDB Database: #{RocketJob::Job.collection.database.name}"
build_heartbeat(updated_at: Time.now, workers: 0)
started!
- adjust_workers(true)
- logger.info "RocketJob Server started with #{workers.size} workers running"
+ logger.info 'RocketJob Server started'
+ stagger = true
while running? || paused?
- sleep Config.instance.heartbeat_seconds
+ SemanticLogger.silence(:info) do
+ find_and_update(
+ 'heartbeat.updated_at' => Time.now,
+ 'heartbeat.workers' => worker_count
+ )
+ end
+ if paused?
+ workers.each(&:shutdown!)
+ stagger = true
+ end
- find_and_update(
- 'heartbeat.updated_at' => Time.now,
- 'heartbeat.workers' => worker_count
- )
-
# In case number of threads has been modified
- adjust_workers
+ adjust_workers(stagger)
+ stagger = false
# Stop server if shutdown indicator was set
- stop! if self.class.shutdown? && may_stop?
+ if self.class.shutdown? && may_stop?
+ stop!
+ else
+ sleep Config.instance.heartbeat_seconds
+ end
end
logger.info 'Waiting for workers to stop'
# Tell each worker to shutdown cleanly
workers.each(&:shutdown!)
@@ -291,27 +304,29 @@
# Whether to stagger when the workers poll for work the first time
# It spreads out the queue polling over the max_poll_seconds so
# that not all workers poll at the same time
# The worker also respond faster than max_poll_seconds when a new
# job is added.
- def adjust_workers(stagger_workers=false)
+ def adjust_workers(stagger_workers = false)
count = worker_count
# Cleanup workers that have stopped
if count != workers.count
logger.info "Cleaning up #{workers.count - count} workers that went away"
workers.delete_if { |t| !t.alive? }
end
+ return unless running?
+
# Need to add more workers?
if count < max_workers
worker_count = max_workers - count
logger.info "Starting #{worker_count} workers"
worker_count.times.each do
sleep (Config.instance.max_poll_seconds.to_f / max_workers) * (next_worker_id - 1) if stagger_workers
return if shutdown?
# Start worker
begin
- workers << Worker.new(id: next_worker_id, server_name: name)
+ workers << Worker.new(id: next_worker_id, server_name: name, filter: filter)
rescue Exception => exc
logger.fatal('Cannot start worker', exc)
end
end
end