lib/rocket_job/worker.rb in rocketjob-1.3.0 vs lib/rocket_job/worker.rb in rocketjob-2.0.0.rc1

- old
+ new

@@ -1,8 +1,8 @@ # encoding: UTF-8 require 'socket' -require 'aasm' +require 'concurrent' module RocketJob # Worker # # On startup a worker instance will automatically register itself # if not already present @@ -25,17 +25,14 @@ # # Sending the kill signal locally will result in starting the shutdown process # immediately. Via the UI or Ruby code the worker can take up to 15 seconds # (the heartbeat interval) to start shutting down. class Worker - include MongoMapper::Document - include AASM + include Plugins::Document + include Plugins::StateMachine include SemanticLogger::Loggable - # Prevent data in MongoDB from re-defining the model behavior - #self.static_keys = true - # @formatter:off # Unique Name of this worker instance # Defaults to the `hostname` but _must_ be overriden if mutiple Worker instances # are started on the same host # The unique name is used on re-start to re-queue any jobs that were being processed @@ -71,42 +68,53 @@ transitions from: :starting, to: :running before do self.started_at = Time.now end end + event :pause do transitions from: :running, to: :paused end + event :resume do transitions from: :paused, to: :running end + event :stop do transitions from: :running, to: :stopping transitions from: :paused, to: :stopping transitions from: :starting, to: :stopping end end # @formatter:on - attr_reader :thread_pool - # Requeue any jobs being worked by this worker when it is destroyed before_destroy :requeue_jobs # Run the worker process # Attributes supplied are passed to #new def self.run(attrs={}) - worker = new(attrs) - worker.build_heartbeat - worker.save! + Thread.current.name = 'rocketjob main' create_indexes register_signal_handlers - if defined?(RocketJobPro) && (RocketJob::Job.database.name != RocketJob::SlicedJob.database.name) + if defined?(RocketJobPro) && (RocketJob::Job.database.name != RocketJob::Jobs::PerformanceJob.database.name) raise 'The RocketJob configuration is being applied after the system has been initialized' end - logger.info "Using MongoDB Database: #{RocketJob::Job.database.name}" - worker.run + + worker = create!(attrs) + if worker.max_threads == 0 + # Does not start any additional threads and runs the worker in the current thread. + # No heartbeats are performed. So this worker will appear as a zombie in RJMC. + # Designed for profiling purposes where a single thread is much simpler to profile. + worker.started! + worker.send(:worker, 0) + else + worker.send(:run) + end + + ensure + worker.destroy if worker end # Create indexes def self.create_indexes ensure_index [[:name, 1]], background: true, unique: true @@ -125,15 +133,10 @@ count += 1 end count end - def self.destroy_dead_workers - warn 'RocketJob::Worker.destroy_dead_workers is deprecated, use RocketJob::Worker.destroy_zombies' - destroy_zombies - end - # Stop all running, paused, or starting workers def self.stop_all where(state: [:running, :paused, :starting]).each(&:stop!) end @@ -145,93 +148,139 @@ # Resume all paused workers def self.resume_all paused.each(&:resume!) end + # Returns [Hash<String:Integer>] of the number of workers in each state. + # Note: If there are no workers in that particular state then the hash will not have a value for it. + # + # Example workers in every state: + # RocketJob::Worker.counts_by_state + # # => { + # :aborted => 1, + # :completed => 37, + # :failed => 1, + # :paused => 3, + # :queued => 4, + # :running => 1, + # :queued_now => 1, + # :scheduled => 3 + # } + # + # Example no workers active: + # RocketJob::Worker.counts_by_state + # # => {} + def self.counts_by_state + counts = {} + collection.aggregate([ + { + '$group' => { + _id: '$state', + count: {'$sum' => 1} + } + } + ] + ).each do |result| + counts[result['_id']] = result['count'] + end + counts + end + # Returns [Boolean] whether the worker is shutting down def shutting_down? - if self.class.shutdown - stop! if running? - true - else - !running? + self.class.shutdown? || !running? + end + + # Returns [true|false] if this worker has missed at least the last 4 heartbeats + # + # Possible causes for a worker to miss its heartbeats: + # - The worker process has died + # - The worker process is "hanging" + # - The worker is no longer able to communicate with the MongoDB Server + def zombie?(missed = 4) + return false unless running? + return true if heartbeat.updated_at.nil? + dead_seconds = Config.instance.heartbeat_seconds * missed + (Time.now - heartbeat.updated_at) >= dead_seconds + end + + # On MRI the 'concurrent-ruby-ext' gem may not be loaded + if defined?(Concurrent::JavaAtomicBoolean) || defined?(Concurrent::CAtomicBoolean) + # Returns [true|false] whether the shutdown indicator has been set for this worker process + def self.shutdown? + @@shutdown.value end + + # Set shutdown indicator for this worker process + def self.shutdown! + @@shutdown.make_true + end + + @@shutdown = Concurrent::AtomicBoolean.new(false) + else + # Returns [true|false] whether the shutdown indicator has been set for this worker process + def self.shutdown? + @@shutdown + end + + # Set shutdown indicator for this worker process + def self.shutdown! + @@shutdown = true + end + + @@shutdown = false end - # Returns [Array<Thread>] threads in the thread_pool - def thread_pool - @thread_pool ||= [] + private + + attr_reader :worker_threads + + # Returns [Array<Thread>] collection of created worker threads + def worker_threads + @worker_threads ||= [] end # Management Thread def run - Thread.current.name = 'rocketjob main' - build_heartbeat unless heartbeat - - started - adjust_thread_pool(true) - save + logger.info "Using MongoDB Database: #{RocketJob::Job.database.name}" + build_heartbeat(updated_at: Time.now, current_threads: 0) + started! + adjust_worker_threads(true) logger.info "RocketJob Worker started with #{max_threads} workers running" count = 0 - loop do - # Update heartbeat so that monitoring tools know that this worker is alive - set( + while running? || paused? + sleep Config.instance.heartbeat_seconds + + update_attributes_and_reload( 'heartbeat.updated_at' => Time.now, - 'heartbeat.current_threads' => thread_pool_count + 'heartbeat.current_threads' => worker_count ) - # Reload the worker model every few heartbeats in case its config was changed - # TODO make 3 configurable - if count >= 3 - reload - adjust_thread_pool - count = 0 - else - count += 1 - end + # In case number of threads has been modified + adjust_worker_threads - # Stop worker if shutdown signal was raised - stop! if self.class.shutdown && !stopping? - - break if stopping? - - sleep Config.instance.heartbeat_seconds + # Stop worker if shutdown indicator was set + stop! if self.class.shutdown? && may_stop? end logger.info 'Waiting for worker threads to stop' # TODO Put a timeout on join. # Log Thread dump for active threads # Compare thread dumps for any changes, force down if no change? # reload, if model missing: Send Shutdown exception to each thread # 5 more seconds then exit - thread_pool.each { |t| t.join } + worker_threads.each { |t| t.join } logger.info 'Shutdown' rescue Exception => exc logger.error('RocketJob::Worker is stopping due to an exception', exc) - ensure - # Destroy this worker instance - destroy end - def thread_pool_count - thread_pool.count { |i| i.alive? } + # Returns [Fixnum] number of workers (threads) that are alive + def worker_count + worker_threads.count { |i| i.alive? } end - # Returns [true|false] if this worker has missed at least the last 4 heartbeats - # - # Possible causes for a worker to miss its heartbeats: - # - The worker process has died - # - The worker process is "hanging" - # - The worker is no longer able to communicate with the MongoDB Server - def zombie?(missed = 4) - return false unless running? - dead_seconds = Config.instance.heartbeat_seconds * missed - (Time.now - heartbeat.updated_at) >= dead_seconds - end - - protected - def next_worker_id @worker_id ||= 0 @worker_id += 1 end @@ -242,27 +291,27 @@ # Whether to stagger when the threads poll for work the first time # It spreads out the queue polling over the max_poll_seconds so # that not all workers poll at the same time # The worker also respond faster than max_poll_seconds when a new # job is added. - def adjust_thread_pool(stagger_threads=false) - count = thread_pool_count + def adjust_worker_threads(stagger_threads=false) + count = worker_count # Cleanup threads that have stopped - if count != thread_pool.count - logger.info "Cleaning up #{thread_pool.count - count} threads that went away" - thread_pool.delete_if { |t| !t.alive? } + if count != worker_threads.count + logger.info "Cleaning up #{worker_threads.count - count} threads that went away" + worker_threads.delete_if { |t| !t.alive? } end return if shutting_down? # Need to add more threads? if count < max_threads thread_count = max_threads - count logger.info "Starting #{thread_count} threads" thread_count.times.each do # Start worker thread - thread_pool << Thread.new(next_worker_id) do |id| + worker_threads << Thread.new(next_worker_id) do |id| begin sleep (Config.instance.max_poll_seconds.to_f / max_threads) * (id - 1) if stagger_threads worker(id) rescue Exception => exc logger.fatal('Cannot start worker thread', exc) @@ -272,89 +321,71 @@ end end # Keep processing jobs until worker stops running def worker(worker_id) - Thread.current.name = "rocketjob #{worker_id}" + Thread.current.name = 'rocketjob %03i' % worker_id logger.info 'Started' while !shutting_down? - if process_next_job - # Keeps workers staggered across the poll interval so that not - # all workers poll at the same time + if process_available_jobs + # Keeps workers staggered across the poll interval so that + # all workers don't poll at the same time sleep rand(RocketJob::Config.instance.max_poll_seconds * 1000) / 1000 else + break if shutting_down? sleep RocketJob::Config.instance.max_poll_seconds end end logger.info "Stopping. Worker state: #{state.inspect}" rescue Exception => exc logger.fatal('Unhandled exception in job processing thread', exc) end # Process the next available job # Returns [Boolean] whether any job was actually processed - def process_next_job + def process_available_jobs skip_job_ids = [] - while job = Job.next_job(name, skip_job_ids) - logger.tagged("Job #{job.id}") do - if job.work(self) - return true if shutting_down? + processed = false + while (job = Job.rocket_job_next_job(name, skip_job_ids)) && !shutting_down? + logger.fast_tag("Job #{job.id}") do + if job.rocket_job_work(self) # Need to skip the specified job due to throttling or no work available skip_job_ids << job.id else - return true + processed = true end end end - false + processed end - # Requeue any jobs assigned to this worker - def requeue_jobs - stop! if running? || paused? - RocketJob::Job.requeue_dead_worker(name) - end - - # Shutdown indicator - def self.shutdown - @@shutdown - end - - @@shutdown = false - # Register handlers for the various signals # Term: # Perform clean shutdown # def self.register_signal_handlers begin Signal.trap 'SIGTERM' do - # Cannot use Mutex protected writer here since it is in a signal handler - @@shutdown = true - logger.warn 'Shutdown signal (SIGTERM) received. Will shutdown as soon as active jobs/slices have completed.' + shutdown! + message = 'Shutdown signal (SIGTERM) received. Will shutdown as soon as active jobs/slices have completed.' + # Logging uses a mutex to access Queue on MRI/CRuby + defined?(JRuby) ? logger.warn(message) : puts(message) end Signal.trap 'INT' do - # Cannot use Mutex protected writer here since it is in a signal handler - @@shutdown = true - logger.warn 'Shutdown signal (INT) received. Will shutdown as soon as active jobs/slices have completed.' + shutdown! + message = 'Shutdown signal (INT) received. Will shutdown as soon as active jobs/slices have completed.' + # Logging uses a mutex to access Queue on MRI/CRuby + defined?(JRuby) ? logger.warn(message) : puts(message) end rescue StandardError logger.warn 'SIGTERM handler not installed. Not able to shutdown gracefully' end end - # Patch the way MongoMapper reloads a model - def reload - if doc = collection.find_one(:_id => id) - # Clear out keys that are not returned during the reload from MongoDB - (keys.keys - doc.keys).each { |key| send("#{key}=", nil) } - initialize_default_values - load_from_database(doc) - self - else - raise MongoMapper::DocumentNotFound, "Document match #{_id.inspect} does not exist in #{collection.name} collection" - end + # Requeue any jobs assigned to this worker when it is destroyed + def requeue_jobs + RocketJob::Job.requeue_dead_worker(name) end end end