lib/resque/worker.rb in resque-1.26.pre.0 vs lib/resque/worker.rb in resque-1.26.0
- old
+ new
@@ -8,12 +8,16 @@
# memory growth as well as low level failures.
#
# It also ensures workers are always listening to signals from you,
# their master, and can react accordingly.
class Worker
+ include Resque::Helpers
+ extend Resque::Helpers
include Resque::Logging
+ WORKER_HEARTBEAT_KEY = "workers:heartbeat"
+
def redis
Resque.redis
end
def self.redis
@@ -21,50 +25,37 @@
end
# Given a Ruby object, returns a string suitable for storage in a
# queue.
def encode(object)
- if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load)
- MultiJson.dump object
- else
- MultiJson.encode object
- end
+ Resque.encode(object)
end
# Given a string, returns a Ruby object.
def decode(object)
- return unless object
-
- begin
- if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load)
- MultiJson.load object
- else
- MultiJson.decode object
- end
- rescue ::MultiJson::DecodeError => e
- raise DecodeException, e.message, e.backtrace
- end
+ Resque.decode(object)
end
- # Boolean indicating whether this worker can or can not fork.
- # Automatically set if a fork(2) fails.
- attr_accessor :cant_fork
-
attr_accessor :term_timeout
# decide whether to use new_kill_child logic
attr_accessor :term_child
+ # should term kill workers gracefully (vs. immediately)
+ # Makes SIGTERM work like SIGQUIT
+ attr_accessor :graceful_term
+
# When set to true, forked workers will exit with `exit`, calling any `at_exit` code handlers that have been
# registered in the application. Otherwise, forked workers exit with `exit!`
attr_accessor :run_at_exit_hooks
attr_writer :to_s
+ attr_writer :pid
# Returns an array of all worker objects.
def self.all
- Array(redis.smembers(:workers)).map { |id| find(id) }.compact
+ Array(redis.smembers(:workers)).map { |id| find(id, :skip_exists => true) }.compact
end
# Returns an array of all worker objects currently processing
# jobs.
def self.working
@@ -85,20 +76,26 @@
reportedly_working[name] = value unless value.nil? || value.empty?
end
end
reportedly_working.keys.map do |key|
- find key.sub("worker:", '')
+ worker = find(key.sub("worker:", ''), :skip_exists => true)
+ worker.job = worker.decode(reportedly_working[key])
+ worker
end.compact
end
# Returns a single worker object. Accepts a string id.
- def self.find(worker_id)
- if exists? worker_id
- queues = worker_id.split(':')[-1].split(',')
+ def self.find(worker_id, options = {})
+ skip_exists = options[:skip_exists]
+
+ if skip_exists || exists?(worker_id)
+ host, pid, queues_raw = worker_id.split(':')
+ queues = queues_raw.split(',')
worker = new(*queues)
worker.to_s = worker_id
+ worker.pid = pid.to_i
worker
else
nil
end
end
@@ -124,13 +121,42 @@
#
# If passed a single "*", this Worker will operate on all queues
# in alphabetical order. Queues can be dynamically added or
# removed without needing to restart workers using this method.
def initialize(*queues)
- @queues = queues.map { |queue| queue.to_s.strip }
@shutdown = nil
@paused = nil
+ @before_first_fork_hook_ran = false
+
+ self.verbose = ENV['LOGGING'] || ENV['VERBOSE']
+ self.very_verbose = ENV['VVERBOSE']
+ self.term_timeout = ENV['RESQUE_TERM_TIMEOUT'] || 4.0
+ self.term_child = ENV['TERM_CHILD']
+ self.graceful_term = ENV['GRACEFUL_TERM']
+ self.run_at_exit_hooks = ENV['RUN_AT_EXIT_HOOKS']
+
+ if ENV['BACKGROUND']
+ unless Process.respond_to?('daemon')
+ abort "env var BACKGROUND is set, which requires ruby >= 1.9"
+ end
+ Process.daemon(true)
+ self.reconnect
+ end
+
+ if ENV['PIDFILE']
+ File.open(ENV['PIDFILE'], 'w') { |f| f << pid }
+ end
+
+ self.queues = queues
+ end
+
+ def queues=(queues)
+ queues = queues.empty? ? (ENV["QUEUES"] || ENV['QUEUE']).to_s.split(',') : queues
+ @queues = queues.map { |queue| queue.to_s.strip }
+ unless ['*', '?', '{', '}', '[', ']'].any? {|char| @queues.join.include?(char) }
+ @static_queues = @queues.flatten.uniq
+ end
validate_queues
end
# A worker must be given a queue, otherwise it won't know what to
# do with itself.
@@ -140,10 +166,24 @@
if @queues.nil? || @queues.empty?
raise NoQueueError.new("Please give each worker at least one queue.")
end
end
+ # Returns a list of queues to use when searching for a job.
+ # A splat ("*") means you want every queue (in alpha order) - this
+ # can be useful for dynamically adding new queues.
+ def queues
+ return @static_queues if @static_queues
+ @queues.map { |queue| glob_match(queue) }.flatten.uniq
+ end
+
+ def glob_match(pattern)
+ Resque.queues.select do |queue|
+ File.fnmatch?(pattern, queue)
+ end.sort
+ end
+
# This is the main workhorse method. Called on a Worker instance,
# it begins the worker life cycle.
#
# The following events occur during a worker's life cycle:
#
@@ -165,11 +205,11 @@
loop do
break if shutdown?
if not paused? and job = reserve
- log "got: #{job.inspect}"
+ log_with_severity :info, "got: #{job.inspect}"
job.worker = self
working_on job
procline "Processing #{job.queue} since #{Time.now.to_i} [#{job.payload_class_name}]"
if @child = fork(job)
@@ -178,16 +218,16 @@
begin
Process.waitpid(@child)
rescue SystemCallError
nil
end
- job.fail(DirtyExit.new($?.to_s)) if $?.signaled?
+ job.fail(DirtyExit.new("Child process received unhandled signal #{$?.stopsig}")) if $?.signaled?
else
unregister_signal_handlers if will_fork? && term_child
begin
- reconnect
+ reconnect if will_fork?
perform(job, &block)
rescue Exception => exception
report_failed_job(job,exception)
end
@@ -199,20 +239,20 @@
done_working
@child = nil
else
break if interval.zero?
- log! "Sleeping for #{interval} seconds"
- procline paused? ? "Paused" : "Waiting for #{@queues.join(',')}"
+ log_with_severity :debug, "Sleeping for #{interval} seconds"
+ procline paused? ? "Paused" : "Waiting for #{queues.join(',')}"
sleep interval
end
end
unregister_worker
rescue Exception => exception
unless exception.class == SystemExit && !@child && run_at_exit_hooks
- log "Failed to start worker : #{exception.inspect}"
+ log_with_severity :error, "Failed to start worker : #{exception.inspect}"
unregister_worker(exception)
end
end
@@ -228,20 +268,20 @@
done_working
end
# Reports the exception and marks the job as failed
def report_failed_job(job,exception)
- log "#{job.inspect} failed: #{exception.inspect}"
+ log_with_severity :error, "#{job.inspect} failed: #{exception.inspect}"
begin
job.fail(exception)
rescue Object => exception
- log "Received exception when reporting failure: #{exception.inspect}"
+ log_with_severity :error, "Received exception when reporting failure: #{exception.inspect}"
end
begin
failed!
rescue Object => exception
- log "Received exception when increasing failed jobs counter (redis issue) : #{exception.inspect}"
+ log_with_severity :error, "Received exception when increasing failed jobs counter (redis issue) : #{exception.inspect}"
end
end
# Processes a given job in the child.
def perform(job)
@@ -249,31 +289,31 @@
run_hook :after_fork, job if will_fork?
job.perform
rescue Object => e
report_failed_job(job,e)
else
- log "done: #{job.inspect}"
+ log_with_severity :info, "done: #{job.inspect}"
ensure
yield job if block_given?
end
end
# Attempts to grab a job off one of the provided queues. Returns
# nil if no job can be found.
def reserve
queues.each do |queue|
- log! "Checking #{queue}"
+ log_with_severity :debug, "Checking #{queue}"
if job = Resque.reserve(queue)
- log! "Found job on #{queue}"
+ log_with_severity :debug, "Found job on #{queue}"
return job
end
end
nil
rescue Exception => e
- log "Error reserving job: #{e.inspect}"
- log e.backtrace.join("\n")
+ log_with_severity :error, "Error reserving job: #{e.inspect}"
+ log_with_severity :error, e.backtrace.join("\n")
raise e
end
# Reconnect to Redis to avoid sharing a connection with the parent,
# retry up to 3 times with increasing delay before giving up.
@@ -281,53 +321,33 @@
tries = 0
begin
redis.client.reconnect
rescue Redis::BaseConnectionError
if (tries += 1) <= 3
- log "Error reconnecting to Redis; retrying"
+ log_with_severity :error, "Error reconnecting to Redis; retrying"
sleep(tries)
retry
else
- log "Error reconnecting to Redis; quitting"
+ log_with_severity :error, "Error reconnecting to Redis; quitting"
raise
end
end
end
- # Returns a list of queues to use when searching for a job.
- # A splat ("*") means you want every queue (in alpha order) - this
- # can be useful for dynamically adding new queues.
- def queues
- @queues.map do |queue|
- queue.strip!
- if (matched_queues = glob_match(queue)).empty?
- queue
- else
- matched_queues
- end
- end.flatten.uniq
- end
-
- def glob_match(pattern)
- Resque.queues.select do |queue|
- File.fnmatch?(pattern, queue)
- end.sort
- end
-
# Not every platform supports fork. Here we do our magic to
# determine if yours does.
def fork(job)
- return if @cant_fork
+ return unless will_fork?
# Only run before_fork hooks if we're actually going to fork
- # (after checking @cant_fork)
+ # (after checking will_fork?)
run_hook :before_fork, job
begin
# IronRuby doesn't support `Kernel.fork` yet
if Kernel.respond_to?(:fork)
- Kernel.fork if will_fork?
+ Kernel.fork
else
raise NotImplementedError
end
rescue NotImplementedError
@cant_fork = true
@@ -335,13 +355,13 @@
end
end
# Runs all the methods needed when a worker begins its lifecycle.
def startup
- Kernel.warn "WARNING: This way of doing signal handling is now deprecated. Please see http://hone.heroku.com/resque/2012/08/21/resque-signals.html for more info." unless term_child or $TESTING
enable_gc_optimizations
register_signal_handlers
+ start_heartbeat
prune_dead_workers
run_hook :before_first_fork
register_worker
# Fix buffering so we can `rake resque:work > resque.log` and
@@ -364,11 +384,11 @@
# QUIT: Shutdown after the current job has finished processing.
# USR1: Kill the forked child immediately, continue processing jobs.
# USR2: Don't process any new jobs
# CONT: Start processing jobs again after a USR2
def register_signal_handlers
- trap('TERM') { shutdown! }
+ trap('TERM') { graceful_term ? shutdown : shutdown! }
trap('INT') { shutdown! }
begin
trap('QUIT') { shutdown }
if term_child
@@ -377,23 +397,23 @@
trap('USR1') { kill_child }
end
trap('USR2') { pause_processing }
trap('CONT') { unpause_processing }
rescue ArgumentError
- warn "Signals QUIT, USR1, USR2, and/or CONT not supported."
+ log_with_severity :warn, "Signals QUIT, USR1, USR2, and/or CONT not supported."
end
- log! "Registered signals"
+ log_with_severity :debug, "Registered signals"
end
def unregister_signal_handlers
trap('TERM') do
- trap ('TERM') do
- # ignore subsequent terms
- end
- raise TermException.new("SIGTERM")
- end
+ trap ('TERM') do
+ # ignore subsequent terms
+ end
+ raise TermException.new("SIGTERM")
+ end
trap('INT', 'DEFAULT')
begin
trap('QUIT', 'DEFAULT')
trap('USR1', 'DEFAULT')
@@ -403,19 +423,28 @@
end
# Schedule this worker for shutdown. Will finish processing the
# current job.
def shutdown
- log 'Exiting...'
+ log_with_severity :info, 'Exiting...'
@shutdown = true
end
# Kill the child and shutdown immediately.
+ # If not forking, abort this process.
def shutdown!
shutdown
if term_child
- new_kill_child
+ if fork_per_job?
+ new_kill_child
+ else
+ # Raise TermException in the same process
+ trap('TERM') do
+ # ignore subsequent terms
+ end
+ raise TermException.new("SIGTERM")
+ end
else
kill_child
end
end
@@ -426,58 +455,102 @@
# Kills the forked child immediately, without remorse. The job it
# is processing will not be completed.
def kill_child
if @child
- log! "Killing child at #{@child}"
+ log_with_severity :debug, "Killing child at #{@child}"
if `ps -o pid,state -p #{@child}`
Process.kill("KILL", @child) rescue nil
else
- log! "Child #{@child} not found, restarting."
+ log_with_severity :debug, "Child #{@child} not found, restarting."
shutdown
end
end
end
+ def heartbeat
+ heartbeat = redis.hget(WORKER_HEARTBEAT_KEY, to_s)
+ heartbeat && Time.parse(heartbeat)
+ end
+
+ def self.all_heartbeats
+ redis.hgetall(WORKER_HEARTBEAT_KEY)
+ end
+
+ # Returns a list of workers that have sent a heartbeat in the past, but which
+ # already expired (does NOT include workers that have never sent a heartbeat at all).
+ def self.all_workers_with_expired_heartbeats
+ workers = Worker.all
+ heartbeats = Worker.all_heartbeats
+
+ workers.select do |worker|
+ id = worker.to_s
+ heartbeat = heartbeats[id]
+
+ if heartbeat
+ seconds_since_heartbeat = (Time.now - Time.parse(heartbeat)).to_i
+ seconds_since_heartbeat > Resque.prune_interval
+ else
+ false
+ end
+ end
+ end
+
+ def heartbeat!(time = Time.now)
+ redis.hset(WORKER_HEARTBEAT_KEY, to_s, time.iso8601)
+ end
+
+ def start_heartbeat
+ heartbeat!
+ @heart = Thread.new do
+ loop do
+ sleep(Resque.heartbeat_interval)
+ heartbeat!
+ end
+ end
+ end
+
# Kills the forked child immediately with minimal remorse. The job it
# is processing will not be completed. Send the child a TERM signal,
# wait 5 seconds, and then a KILL signal if it has not quit
def new_kill_child
if @child
unless Process.waitpid(@child, Process::WNOHANG)
- log! "Sending TERM signal to child #{@child}"
+ log_with_severity :debug, "Sending TERM signal to child #{@child}"
Process.kill("TERM", @child)
(term_timeout.to_f * 10).round.times do |i|
sleep(0.1)
return if Process.waitpid(@child, Process::WNOHANG)
end
- log! "Sending KILL signal to child #{@child}"
+ log_with_severity :debug, "Sending KILL signal to child #{@child}"
Process.kill("KILL", @child)
else
- log! "Child #{@child} already quit."
+ log_with_severity :debug, "Child #{@child} already quit."
end
end
rescue SystemCallError
- log! "Child #{@child} already quit and reaped."
+ log_with_severity :error, "Child #{@child} already quit and reaped."
end
# are we paused?
def paused?
@paused
end
# Stop processing jobs after the current one has completed (if we're
# currently running one).
def pause_processing
- log "USR2 received; pausing job processing"
+ log_with_severity :info, "USR2 received; pausing job processing"
+ run_hook :before_pause, self
@paused = true
end
# Start processing jobs again after a pause
def unpause_processing
- log "CONT received; resuming job processing"
+ log_with_severity :info, "CONT received; resuming job processing"
@paused = false
+ run_hook :after_pause, self
end
# Looks for any workers which should be running on this server
# and, if they're not, removes them from Redis.
#
@@ -488,25 +561,44 @@
#
# By checking the current Redis state against the actual
# environment, we can determine if Redis is old and clean it up a bit.
def prune_dead_workers
all_workers = Worker.all
- known_workers = worker_pids unless all_workers.empty?
+
+ unless all_workers.empty?
+ known_workers = worker_pids
+ all_workers_with_expired_heartbeats = Worker.all_workers_with_expired_heartbeats
+ end
+
all_workers.each do |worker|
+ # If the worker hasn't sent a heartbeat, remove it from the registry.
+ #
+ # If the worker hasn't ever sent a heartbeat, we won't remove it since
+ # the first heartbeat is sent before the worker is registred it means
+ # that this is a worker that doesn't support heartbeats, e.g., another
+ # client library or an older version of Resque. We won't touch these.
+ if all_workers_with_expired_heartbeats.include?(worker)
+ log_with_severity :info, "Pruning dead worker: #{worker}"
+ worker.unregister_worker(PruneDeadWorkerDirtyExit.new(worker.to_s))
+ next
+ end
+
host, pid, worker_queues_raw = worker.id.split(':')
worker_queues = worker_queues_raw.split(",")
unless @queues.include?("*") || (worker_queues.to_set == @queues.to_set)
# If the worker we are trying to prune does not belong to the queues
- # we are listening to, we should not touch it.
+ # we are listening to, we should not touch it.
# Attempt to prune a worker from different queues may easily result in
- # an unknown class exception, since that worker could easily be even
+ # an unknown class exception, since that worker could easily be even
# written in different language.
next
end
+
next unless host == hostname
next if known_workers.include?(pid)
- log! "Pruning dead worker: #{worker}"
+
+ log_with_severity :debug, "Pruning dead worker: #{worker}"
worker.unregister_worker
end
end
# Registers ourself as a worker. Useful when entering the worker
@@ -519,39 +611,61 @@
end
# Runs a named hook, passing along any arguments.
def run_hook(name, *args)
return unless hooks = Resque.send(name)
+ return if name == :before_first_fork && @before_first_fork_hook_ran
msg = "Running #{name} hooks"
msg << " with #{args.inspect}" if args.any?
- log msg
+ log_with_severity :info, msg
hooks.each do |hook|
args.any? ? hook.call(*args) : hook.call
+ @before_first_fork_hook_ran = true if name == :before_first_fork
end
end
+ def kill_background_threads
+ @heart.kill if @heart
+ end
+
# Unregisters ourself as a worker. Useful when shutting down.
def unregister_worker(exception = nil)
# If we're still processing a job, make sure it gets logged as a
# failure.
if (hash = processing) && !hash.empty?
job = Job.new(hash['queue'], hash['payload'])
# Ensure the proper worker is attached to this job, even if
# it's not the precise instance that died.
job.worker = self
- job.fail(exception || DirtyExit.new)
+ begin
+ job.fail(exception || DirtyExit.new("Job still being processed"))
+ rescue RuntimeError => e
+ log_with_severity :error, e.message
+ end
end
+ kill_background_threads
+
redis.pipelined do
redis.srem(:workers, self)
redis.del("worker:#{self}")
redis.del("worker:#{self}:started")
+ redis.hdel(WORKER_HEARTBEAT_KEY, self.to_s)
Stat.clear("processed:#{self}")
Stat.clear("failed:#{self}")
end
+ rescue Exception => exception_while_unregistering
+ message = exception_while_unregistering.message
+ if exception
+ message = message + "\nOriginal Exception (#{exception.class}): #{exception.message}\n" +
+ " #{exception.backtrace.join(" \n")}"
+ end
+ fail(exception_while_unregistering.class,
+ message,
+ exception_while_unregistering.backtrace)
end
# Given a job, tells Redis we're working on it. Useful for seeing
# what workers are doing and when.
def working_on(job)
@@ -602,13 +716,15 @@
def started!
redis.set("worker:#{self}:started", Time.now.to_s)
end
# Returns a hash explaining the Job we're currently processing, if any.
- def job
- decode(redis.get("worker:#{self}")) || {}
+ def job(reload = true)
+ @job = nil if reload
+ @job ||= decode(redis.get("worker:#{self}")) || {}
end
+ attr_writer :job
alias_method :processing, :job
# Boolean - true if working, false if not
def working?
state == :working
@@ -618,13 +734,17 @@
def idle?
state == :idle
end
def will_fork?
- !@cant_fork && !$TESTING && (ENV["FORK_PER_JOB"] != 'false')
+ !@cant_fork && fork_per_job?
end
+ def fork_per_job?
+ ENV["FORK_PER_JOB"] != 'false'
+ end
+
# Returns a symbol representing the current worker state,
# which can be either :working or :idle
def state
redis.exists("worker:#{self}") ? :working : :idle
end
@@ -675,11 +795,11 @@
end
# Find Resque worker pids on Linux and OS X.
#
def linux_worker_pids
- `ps -A -o pid,command | grep "[r]esque" | grep -v "resque-web"`.split("\n").map do |line|
+ `ps -A -o pid,command | grep -E "[r]esque:work|[r]esque-[0-9]" | grep -v "resque-web"`.split("\n").map do |line|
line.split(' ')[0]
end
end
# Find Resque worker pids on Solaris.
@@ -696,71 +816,60 @@
end.compact
end
# Given a string, sets the procline ($0) and logs.
# Procline is always in the format of:
- # resque-VERSION: STRING
+ # RESQUE_PROCLINE_PREFIXresque-VERSION: STRING
def procline(string)
- $0 = "resque-#{Resque::Version}: #{string}"
- log! $0
+ $0 = "#{ENV['RESQUE_PROCLINE_PREFIX']}resque-#{Resque::Version}: #{string}"
+ log_with_severity :debug, $0
end
- # Log a message to Resque.logger
- # can't use alias_method since info/debug are private methods
def log(message)
info(message)
end
def log!(message)
debug(message)
end
- # Deprecated legacy methods for controlling the logging threshhold
- # Use Resque.logger.level now, e.g.:
- #
- # Resque.logger.level = Logger::DEBUG
- #
+
def verbose
- logger_severity_deprecation_warning
@verbose
end
def very_verbose
- logger_severity_deprecation_warning
@very_verbose
end
def verbose=(value);
- logger_severity_deprecation_warning
-
if value && !very_verbose
Resque.logger.formatter = VerboseFormatter.new
+ Resque.logger.level = Logger::INFO
elsif !value
Resque.logger.formatter = QuietFormatter.new
end
@verbose = value
end
def very_verbose=(value)
- logger_severity_deprecation_warning
if value
Resque.logger.formatter = VeryVerboseFormatter.new
+ Resque.logger.level = Logger::DEBUG
elsif !value && verbose
Resque.logger.formatter = VerboseFormatter.new
+ Resque.logger.level = Logger::INFO
else
Resque.logger.formatter = QuietFormatter.new
end
@very_verbose = value
end
- def logger_severity_deprecation_warning
- return if $TESTING
- return if $warned_logger_severity_deprecation
- Kernel.warn "*** DEPRECATION WARNING: Resque::Worker#verbose and #very_verbose are deprecated. Please set Resque.logger.level instead"
- Kernel.warn "Called from: #{caller[0..5].join("\n\t")}"
- $warned_logger_severity_deprecation = true
- nil
+ private
+
+ def log_with_severity(severity, message)
+ Logging.log(severity, message)
end
end
end