lib/rocket_job/worker.rb in rocketjob-1.0.0 vs lib/rocket_job/worker.rb in rocketjob-1.1.0

- old
+ new

@@ -34,10 +34,11 @@ include SemanticLogger::Loggable # Prevent data in MongoDB from re-defining the model behavior #self.static_keys = true + # @formatter:off # Unique Name of this worker instance # Defaults to the `hostname` but _must_ be overriden if mutiple Worker instances # are started on the same host # The unique name is used on re-start to re-queue any jobs that were being processed # at the time the worker or host unexpectedly terminated, if any @@ -84,10 +85,11 @@ transitions from: :running, to: :stopping transitions from: :paused, to: :stopping transitions from: :starting, to: :stopping end end + # @formatter:on attr_reader :thread_pool # Requeue any jobs being worked by this worker when it is destroyed before_destroy :requeue_jobs @@ -98,11 +100,11 @@ worker = new(attrs) worker.build_heartbeat worker.save! create_indexes register_signal_handlers - raise "The RocketJob configuration is being applied after the system has been initialized" unless RocketJob::Job.database.name == RocketJob::SlicedJob.database.name + raise 'The RocketJob configuration is being applied after the system has been initialized' unless RocketJob::Job.database.name == RocketJob::SlicedJob.database.name logger.info "Using MongoDB Database: #{RocketJob::Job.database.name}" worker.run end # Create indexes @@ -110,43 +112,40 @@ ensure_index [[:name, 1]], background: true, unique: true # Also create indexes for the jobs collection Job.create_indexes end - # Destroy dead workers ( missed at least the last 4 heartbeats ) - # Requeue jobs assigned to dead workers - # Destroy dead workers - def self.destroy_dead_workers - dead_seconds = Config.instance.heartbeat_seconds * 4 + # Destroy's all instances of zombie workers and requeues any jobs still "running" + # on those workers + def self.destroy_zombies each do |worker| - next if (Time.now - worker.heartbeat.updated_at) < dead_seconds - logger.warn "Destroying worker #{worker.name}, and requeueing its jobs" + next unless zombie? + logger.warn "Destroying zombie worker #{worker.name}, and requeueing its jobs" worker.destroy end end + def self.destroy_dead_workers + warn 'RocketJob::Worker.destroy_dead_workers is deprecated, use RocketJob::Worker.destroy_zombies' + destroy_zombies + end + # Stop all running, paused, or starting workers def self.stop_all - where(state: ['running', 'paused', 'starting']).each { |worker| worker.stop! } + where(state: [:running, :paused, :starting]).each(&:stop!) end # Pause all running workers def self.pause_all - where(state: 'running').each { |worker| worker.pause! } + running.each(&:pause!) end # Resume all paused workers def self.resume_all - each { |worker| worker.resume! if worker.paused? } + paused.each(&:resume!) end - # Register a handler to perform cleanups etc. whenever a worker is - # explicitly destroyed - def self.register_destroy_handler(&block) - @@destroy_handlers << block - end - # Returns [Boolean] whether the worker is shutting down def shutting_down? if self.class.shutdown stop! if running? true @@ -160,11 +159,11 @@ @thread_pool ||= [] end # Run this instance of the worker def run - Thread.current.name = 'RocketJob main' + Thread.current.name = 'rocketjob main' build_heartbeat unless heartbeat started adjust_thread_pool(true) save @@ -209,13 +208,24 @@ # Destroy this worker instance destroy end def thread_pool_count - thread_pool.count{ |i| i.alive? } + thread_pool.count { |i| i.alive? } end + # Returns [true|false] if this worker has missed at least the last 4 heartbeats + # + # Possible causes for a worker to miss its heartbeats: + # - The worker process has died + # - The worker process is "hanging" + # - The worker is no longer able to communicate with the MongoDB Server + def zombie?(missed = 4) + dead_seconds = Config.instance.heartbeat_seconds * missed + (Time.now - worker.heartbeat.updated_at) >= dead_seconds + end + protected def next_worker_id @worker_id ||= 0 @worker_id += 1 @@ -277,11 +287,11 @@ end # Process the next available job # Returns [Boolean] whether any job was actually processed def process_next_job - skip_job_ids = [] + skip_job_ids = [] while job = Job.next_job(name, skip_job_ids) logger.tagged("Job #{job.id}") do if job.work(self) return true if shutting_down? # Need to skip the specified job due to throttling or no work available @@ -295,11 +305,11 @@ end # Requeue any jobs assigned to this worker def requeue_jobs stop! if running? || paused? - @@destroy_handlers.each { |handler| handler.call(name) } + RocketJob::Job.requeue_dead_worker(name) end # Mutex protected shutdown indicator sync_cattr_accessor :shutdown do false @@ -309,38 +319,37 @@ # Term: # Perform clean shutdown # def self.register_signal_handlers begin - Signal.trap "SIGTERM" do + Signal.trap 'SIGTERM' do # Cannot use Mutex protected writer here since it is in a signal handler @@shutdown = true - logger.warn "Shutdown signal (SIGTERM) received. Will shutdown as soon as active jobs/slices have completed." + logger.warn 'Shutdown signal (SIGTERM) received. Will shutdown as soon as active jobs/slices have completed.' end - Signal.trap "INT" do + Signal.trap 'INT' do # Cannot use Mutex protected writer here since it is in a signal handler @@shutdown = true - logger.warn "Shutdown signal (INT) received. Will shutdown as soon as active jobs/slices have completed." + logger.warn 'Shutdown signal (INT) received. Will shutdown as soon as active jobs/slices have completed.' end - rescue Exception - logger.warn "SIGTERM handler not installed. Not able to shutdown gracefully" + rescue StandardError + logger.warn 'SIGTERM handler not installed. Not able to shutdown gracefully' end end # Patch the way MongoMapper reloads a model def reload if doc = collection.find_one(:_id => id) + # Clear out keys that are not returned during the reload from MongoDB + (keys.keys - doc.keys).each { |key| send("#{key}=", nil) } + initialize_default_values load_from_database(doc) self else raise MongoMapper::DocumentNotFound, "Document match #{_id.inspect} does not exist in #{collection.name} collection" end end - - private - - @@destroy_handlers = ThreadSafe::Array.new end end