lib/rocket_job/server.rb in rocketjob-4.0.0 vs lib/rocket_job/server.rb in rocketjob-4.1.0
- old
+ new
@@ -1,7 +1,8 @@
-require 'yaml'
-require 'concurrent'
+require 'rocket_job/server/model'
+require 'rocket_job/server/state_machine'
+
module RocketJob
# Server
#
# On startup a server instance will automatically register itself
# if not already present
@@ -27,361 +28,9 @@
# (the heartbeat interval) to start shutting down.
class Server
include Plugins::Document
include Plugins::StateMachine
include SemanticLogger::Loggable
-
- store_in collection: 'rocket_job.servers'
-
- # Unique Name of this server instance
- # Default: `host name:PID`
- # The unique name is used on re-start to re-queue any jobs that were being processed
- # at the time the server unexpectedly terminated, if any
- field :name, type: String, default: -> { "#{SemanticLogger.host}:#{$$}" }
-
- # The maximum number of workers this server should start
- # If set, it will override the default value in RocketJob::Config
- field :max_workers, type: Integer, default: -> { Config.instance.max_workers }
-
- # When this server process was started
- field :started_at, type: Time
-
- # Filter to apply to control which job classes this server can process
- field :yaml_filter, type: String
-
- # The heartbeat information for this server
- embeds_one :heartbeat, class_name: 'RocketJob::Heartbeat'
-
- # Current state
- # Internal use only. Do not set this field directly
- field :state, type: Symbol, default: :starting
-
- index({name: 1}, background: true, unique: true, drop_dups: true)
-
- validates_presence_of :state, :name, :max_workers
-
- # States
- # :starting -> :running -> :paused
- # -> :stopping
- aasm column: :state, whiny_persistence: true do
- state :starting, initial: true
- state :running
- state :paused
- state :stopping
-
- event :started do
- transitions from: :starting, to: :running
- before do
- self.started_at = Time.now
- end
- end
-
- event :pause do
- transitions from: :running, to: :paused
- end
-
- event :resume do
- transitions from: :paused, to: :running
- end
-
- event :stop do
- transitions from: :running, to: :stopping
- transitions from: :paused, to: :stopping
- transitions from: :starting, to: :stopping
- end
- end
-
- # Requeue any jobs being worked by this server when it is destroyed
- before_destroy :requeue_jobs
-
- # Destroy's all instances of zombie servers and requeues any jobs still "running"
- # on those servers.
- def self.destroy_zombies
- count = 0
- each do |server|
- next unless server.zombie?
- logger.warn "Destroying zombie server #{server.name}, and requeueing its jobs"
- server.destroy
- count += 1
- end
- count
- end
-
- # Stop all running, paused, or starting servers
- def self.stop_all
- where(:state.in => %i[running paused starting]).each(&:stop!)
- end
-
- # Pause all running servers
- def self.pause_all
- running.each(&:pause!)
- end
-
- # Resume all paused servers
- def self.resume_all
- paused.each(&:resume!)
- end
-
- # Returns [Hash<String:Integer>] of the number of servers in each state.
- # Note: If there are no servers in that particular state then the hash will not have a value for it.
- #
- # Example servers in every state:
- # RocketJob::Server.counts_by_state
- # # => {
- # :aborted => 1,
- # :completed => 37,
- # :failed => 1,
- # :paused => 3,
- # :queued => 4,
- # :running => 1,
- # :queued_now => 1,
- # :scheduled => 3
- # }
- #
- # Example no servers active:
- # RocketJob::Server.counts_by_state
- # # => {}
- def self.counts_by_state
- counts = {}
- collection.aggregate(
- [
- {
- '$group' => {
- _id: '$state',
- count: {'$sum' => 1}
- }
- }
- ]
- ).each do |result|
- counts[result['_id'].to_sym] = result['count']
- end
- counts
- end
-
- # On MRI the 'concurrent-ruby-ext' gem may not be loaded
- if defined?(Concurrent::JavaAtomicBoolean) || defined?(Concurrent::CAtomicBoolean)
- # Returns [true|false] whether the shutdown indicator has been set for this server process
- def self.shutdown?
- @shutdown.value
- end
-
- # Set shutdown indicator for this server process
- def self.shutdown!
- @shutdown.make_true
- end
-
- @shutdown = Concurrent::AtomicBoolean.new(false)
- else
- # Returns [true|false] whether the shutdown indicator has been set for this server process
- def self.shutdown?
- @shutdown
- end
-
- # Set shutdown indicator for this server process
- def self.shutdown!
- @shutdown = true
- end
-
- @shutdown = false
- end
-
- # Run the server process
- # Attributes supplied are passed to #new
- def self.run(attrs = {})
- Thread.current.name = 'rocketjob main'
- # Create Indexes on server startup
- ::Mongoid::Tasks::Database.create_indexes
- register_signal_handlers
-
- server = create!(attrs)
- server.send(:run)
- ensure
- server&.destroy
- end
-
- # Returns [Boolean] whether the server is shutting down
- def shutdown?
- self.class.shutdown? || !running?
- end
-
- # Scope for all zombie servers
- def self.zombies(missed = 4)
- dead_seconds = Config.instance.heartbeat_seconds * missed
- last_heartbeat_time = Time.now - dead_seconds
- where(
- :state.in => %i[stopping running paused],
- '$or' => [
- {'heartbeat.updated_at' => {'$exists' => false}},
- {'heartbeat.updated_at' => {'$lte' => last_heartbeat_time}}
- ]
- )
- end
-
- # Returns [true|false] if this server has missed at least the last 4 heartbeats
- #
- # Possible causes for a server to miss its heartbeats:
- # - The server process has died
- # - The server process is "hanging"
- # - The server is no longer able to communicate with the MongoDB Server
- def zombie?(missed = 4)
- return false unless running? || stopping? || paused?
- return true if heartbeat.nil? || heartbeat.updated_at.nil?
- dead_seconds = Config.instance.heartbeat_seconds * missed
- (Time.now - heartbeat.updated_at) >= dead_seconds
- end
-
- # Where clause filter to apply to workers looking for jobs
- def filter
- YAML.load(yaml_filter) if yaml_filter
- end
-
- def filter=(hash)
- self.yaml_filter = hash.nil? ? nil : hash.to_yaml
- end
-
- private
-
- # Returns [Array<Worker>] collection of workers
- def workers
- @workers ||= []
- end
-
- # Management Thread
- def run
- logger.info "Using MongoDB Database: #{RocketJob::Job.collection.database.name}"
- logger.info('Running with filter', filter) if filter
- build_heartbeat(updated_at: Time.now, workers: 0)
- started!
- logger.info 'Rocket Job Server started'
-
- run_workers
-
- logger.info 'Waiting for workers to stop'
- # Tell each worker to shutdown cleanly
- workers.each(&:shutdown!)
-
- while (worker = workers.first)
- if worker.join(5)
- # Worker thread is dead
- workers.shift
- else
- # Timeout waiting for worker to stop
- find_and_update(
- 'heartbeat.updated_at' => Time.now,
- 'heartbeat.workers' => worker_count
- )
- end
- end
-
- logger.info 'Shutdown'
- rescue ::Mongoid::Errors::DocumentNotFound
- logger.warn('Server has been destroyed. Going down hard!')
- rescue Exception => exc
- logger.error('RocketJob::Server is stopping due to an exception', exc)
- ensure
- # Logs the backtrace for each running worker
- workers.each { |worker| logger.backtrace(thread: worker.thread) if worker.thread && worker.alive? }
- end
-
- def run_workers
- stagger = true
- while running? || paused?
- SemanticLogger.silence(:info) do
- find_and_update(
- 'heartbeat.updated_at' => Time.now,
- 'heartbeat.workers' => worker_count
- )
- end
- if paused?
- workers.each(&:shutdown!)
- stagger = true
- end
-
- # In case number of threads has been modified
- adjust_workers(stagger)
- stagger = false
-
- # Stop server if shutdown indicator was set
- if self.class.shutdown? && may_stop?
- stop!
- else
- sleep Config.instance.heartbeat_seconds
- end
- end
- end
-
- # Returns [Fixnum] number of workers (threads) that are alive
- def worker_count
- workers.count(&:alive?)
- end
-
- def next_worker_id
- @worker_id ||= 0
- @worker_id += 1
- end
-
- # Re-adjust the number of running workers to get it up to the
- # required number of workers
- # Parameters
- # stagger_workers
- # Whether to stagger when the workers poll for work the first time
- # It spreads out the queue polling over the max_poll_seconds so
- # that not all workers poll at the same time
- # The worker also respond faster than max_poll_seconds when a new
- # job is added.
- def adjust_workers(stagger_workers = false)
- count = worker_count
- # Cleanup workers that have stopped
- if count != workers.count
- logger.info "Cleaning up #{workers.count - count} workers that went away"
- workers.delete_if { |t| !t.alive? }
- end
-
- return unless running?
-
- # Need to add more workers?
- return unless count < max_workers
-
- worker_count = max_workers - count
- logger.info "Starting #{worker_count} workers"
- worker_count.times.each do
- sleep(Config.instance.max_poll_seconds.to_f / max_workers) if stagger_workers
- return if shutdown?
- # Start worker
- begin
- workers << Worker.new(id: next_worker_id, server_name: name, filter: filter)
- rescue Exception => exc
- logger.fatal('Cannot start worker', exc)
- end
- end
- end
-
- # Register handlers for the various signals
- # Term:
- # Perform clean shutdown
- #
- def self.register_signal_handlers
- Signal.trap 'SIGTERM' do
- shutdown!
- message = 'Shutdown signal (SIGTERM) received. Will shutdown as soon as active jobs/slices have completed.'
- # Logging uses a mutex to access Queue on MRI/CRuby
- defined?(JRuby) ? logger.warn(message) : puts(message)
- end
-
- Signal.trap 'INT' do
- shutdown!
- message = 'Shutdown signal (INT) received. Will shutdown as soon as active jobs/slices have completed.'
- # Logging uses a mutex to access Queue on MRI/CRuby
- defined?(JRuby) ? logger.warn(message) : puts(message)
- end
- rescue StandardError
- logger.warn 'SIGTERM handler not installed. Not able to shutdown gracefully'
- end
-
- private_class_method :register_signal_handlers
-
- # Requeue any jobs assigned to this server when it is destroyed
- def requeue_jobs
- RocketJob::Job.requeue_dead_server(name)
- end
+ include Server::Model
+ include Server::StateMachine
end
end