lib/resque/worker.rb in resque-1.27.4 vs lib/resque/worker.rb in resque-2.0.0
- old
+ new
@@ -143,10 +143,13 @@
def initialize(*queues)
@shutdown = nil
@paused = nil
@before_first_fork_hook_ran = false
+ @heartbeat_thread = nil
+ @heartbeat_thread_signal = nil
+
verbose_value = ENV['LOGGING'] || ENV['VERBOSE']
self.verbose = verbose_value if verbose_value
self.very_verbose = ENV['VVERBOSE'] if ENV['VVERBOSE']
self.pre_shutdown_timeout = (ENV['RESQUE_PRE_SHUTDOWN_TIMEOUT'] || 0.0).to_f
self.term_timeout = (ENV['RESQUE_TERM_TIMEOUT'] || 4.0).to_f
@@ -160,29 +163,26 @@
# Daemonizes the worker if ENV['BACKGROUND'] is set and writes
# the process id to ENV['PIDFILE'] if set. Should only be called
# once per worker.
def prepare
if ENV['BACKGROUND']
- unless Process.respond_to?('daemon')
- abort "env var BACKGROUND is set, which requires ruby >= 1.9"
- end
Process.daemon(true)
end
if ENV['PIDFILE']
File.open(ENV['PIDFILE'], 'w') { |f| f << pid }
end
self.reconnect if ENV['BACKGROUND']
end
+ WILDCARDS = ['*', '?', '{', '}', '[', ']'].freeze
+
def queues=(queues)
queues = queues.empty? ? (ENV["QUEUES"] || ENV['QUEUE']).to_s.split(',') : queues
@queues = queues.map { |queue| queue.to_s.strip }
- unless ['*', '?', '{', '}', '[', ']'].any? {|char| @queues.join.include?(char) }
- @static_queues = @queues.flatten.uniq
- end
+ @has_dynamic_queues = WILDCARDS.any? {|char| @queues.join.include?(char) }
validate_queues
end
# A worker must be given a queue, otherwise it won't know what to
# do with itself.
@@ -196,16 +196,20 @@
# Returns a list of queues to use when searching for a job.
# A splat ("*") means you want every queue (in alpha order) - this
# can be useful for dynamically adding new queues.
def queues
- return @static_queues if @static_queues
- @queues.map { |queue| glob_match(queue) }.flatten.uniq
+ if @has_dynamic_queues
+ current_queues = Resque.queues
+ @queues.map { |queue| glob_match(current_queues, queue) }.flatten.uniq
+ else
+ @queues
+ end
end
- def glob_match(pattern)
- Resque.queues.select do |queue|
+ def glob_match(list, pattern)
+ list.select do |queue|
File.fnmatch?(pattern, queue)
end.sort
end
# This is the main workhorse method. Called on a Worker instance,
@@ -586,10 +590,12 @@
# will leave stale state information in Redis.
#
# By checking the current Redis state against the actual
# environment, we can determine if Redis is old and clean it up a bit.
def prune_dead_workers
+ return unless data_store.acquire_pruning_dead_worker_lock(self, Resque.heartbeat_interval)
+
all_workers = Worker.all
unless all_workers.empty?
known_workers = worker_pids
all_workers_with_expired_heartbeats = Worker.all_workers_with_expired_heartbeats
@@ -602,11 +608,13 @@
# the first heartbeat is sent before the worker is registred it means
# that this is a worker that doesn't support heartbeats, e.g., another
# client library or an older version of Resque. We won't touch these.
if all_workers_with_expired_heartbeats.include?(worker)
log_with_severity :info, "Pruning dead worker: #{worker}"
- worker.unregister_worker(PruneDeadWorkerDirtyExit.new(worker.to_s))
+
+ job_class = worker.job(false)['payload']['class'] rescue nil
+ worker.unregister_worker(PruneDeadWorkerDirtyExit.new(worker.to_s, job_class))
next
end
host, pid, worker_queues_raw = worker.id.split(':')
worker_queues = worker_queues_raw.split(",")
@@ -633,11 +641,12 @@
data_store.register_worker(self)
end
# Runs a named hook, passing along any arguments.
def run_hook(name, *args)
- return unless hooks = Resque.send(name)
+ hooks = Resque.send(name)
+ return if hooks.empty?
return if name == :before_first_fork && @before_first_fork_hook_ran
msg = "Running #{name} hooks"
msg << " with #{args.inspect}" if args.any?
log_with_severity :info, msg
@@ -806,11 +815,11 @@
# Returns an Array of string pids of all the other workers on this
# machine. Useful when pruning dead workers on startup.
def windows_worker_pids
tasklist_output = `tasklist /FI "IMAGENAME eq ruby.exe" /FO list`.encode("UTF-8", Encoding.locale_charmap)
- tasklist_output.split($/).select { |line| line =~ /^PID:/}.collect{ |line| line.gsub /PID:\s+/, '' }
+ tasklist_output.split($/).select { |line| line =~ /^PID:/ }.collect { |line| line.gsub(/PID:\s+/, '') }
end
# Find Resque worker pids on Linux and OS X.
#
def linux_worker_pids
@@ -848,18 +857,12 @@
def log!(message)
debug(message)
end
- def verbose
- @verbose
- end
+ attr_reader :verbose, :very_verbose
- def very_verbose
- @very_verbose
- end
-
def verbose=(value);
if value && !very_verbose
Resque.logger.formatter = VerboseFormatter.new
Resque.logger.level = Logger::INFO
elsif !value
@@ -907,10 +910,10 @@
Process.waitpid(@child)
rescue SystemCallError
nil
end
- job.fail(DirtyExit.new("Child process received unhandled signal #{$?.stopsig}", $?)) if $?.signaled?
+ job.fail(DirtyExit.new("Child process received unhandled signal #{$?}", $?)) if $?.signaled?
@child = nil
end
def log_with_severity(severity, message)
Logging.log(severity, message)