lib/resque/worker.rb in resque-1.27.4 vs lib/resque/worker.rb in resque-2.0.0

- old
+ new

@@ -143,10 +143,13 @@ def initialize(*queues) @shutdown = nil @paused = nil @before_first_fork_hook_ran = false + @heartbeat_thread = nil + @heartbeat_thread_signal = nil + verbose_value = ENV['LOGGING'] || ENV['VERBOSE'] self.verbose = verbose_value if verbose_value self.very_verbose = ENV['VVERBOSE'] if ENV['VVERBOSE'] self.pre_shutdown_timeout = (ENV['RESQUE_PRE_SHUTDOWN_TIMEOUT'] || 0.0).to_f self.term_timeout = (ENV['RESQUE_TERM_TIMEOUT'] || 4.0).to_f @@ -160,29 +163,26 @@ # Daemonizes the worker if ENV['BACKGROUND'] is set and writes # the process id to ENV['PIDFILE'] if set. Should only be called # once per worker. def prepare if ENV['BACKGROUND'] - unless Process.respond_to?('daemon') - abort "env var BACKGROUND is set, which requires ruby >= 1.9" - end Process.daemon(true) end if ENV['PIDFILE'] File.open(ENV['PIDFILE'], 'w') { |f| f << pid } end self.reconnect if ENV['BACKGROUND'] end + WILDCARDS = ['*', '?', '{', '}', '[', ']'].freeze + def queues=(queues) queues = queues.empty? ? (ENV["QUEUES"] || ENV['QUEUE']).to_s.split(',') : queues @queues = queues.map { |queue| queue.to_s.strip } - unless ['*', '?', '{', '}', '[', ']'].any? {|char| @queues.join.include?(char) } - @static_queues = @queues.flatten.uniq - end + @has_dynamic_queues = WILDCARDS.any? {|char| @queues.join.include?(char) } validate_queues end # A worker must be given a queue, otherwise it won't know what to # do with itself. @@ -196,16 +196,20 @@ # Returns a list of queues to use when searching for a job. # A splat ("*") means you want every queue (in alpha order) - this # can be useful for dynamically adding new queues. def queues - return @static_queues if @static_queues - @queues.map { |queue| glob_match(queue) }.flatten.uniq + if @has_dynamic_queues + current_queues = Resque.queues + @queues.map { |queue| glob_match(current_queues, queue) }.flatten.uniq + else + @queues + end end - def glob_match(pattern) - Resque.queues.select do |queue| + def glob_match(list, pattern) + list.select do |queue| File.fnmatch?(pattern, queue) end.sort end # This is the main workhorse method. Called on a Worker instance, @@ -586,10 +590,12 @@ # will leave stale state information in Redis. # # By checking the current Redis state against the actual # environment, we can determine if Redis is old and clean it up a bit. def prune_dead_workers + return unless data_store.acquire_pruning_dead_worker_lock(self, Resque.heartbeat_interval) + all_workers = Worker.all unless all_workers.empty? known_workers = worker_pids all_workers_with_expired_heartbeats = Worker.all_workers_with_expired_heartbeats @@ -602,11 +608,13 @@ # the first heartbeat is sent before the worker is registred it means # that this is a worker that doesn't support heartbeats, e.g., another # client library or an older version of Resque. We won't touch these. if all_workers_with_expired_heartbeats.include?(worker) log_with_severity :info, "Pruning dead worker: #{worker}" - worker.unregister_worker(PruneDeadWorkerDirtyExit.new(worker.to_s)) + + job_class = worker.job(false)['payload']['class'] rescue nil + worker.unregister_worker(PruneDeadWorkerDirtyExit.new(worker.to_s, job_class)) next end host, pid, worker_queues_raw = worker.id.split(':') worker_queues = worker_queues_raw.split(",") @@ -633,11 +641,12 @@ data_store.register_worker(self) end # Runs a named hook, passing along any arguments. def run_hook(name, *args) - return unless hooks = Resque.send(name) + hooks = Resque.send(name) + return if hooks.empty? return if name == :before_first_fork && @before_first_fork_hook_ran msg = "Running #{name} hooks" msg << " with #{args.inspect}" if args.any? log_with_severity :info, msg @@ -806,11 +815,11 @@ # Returns an Array of string pids of all the other workers on this # machine. Useful when pruning dead workers on startup. def windows_worker_pids tasklist_output = `tasklist /FI "IMAGENAME eq ruby.exe" /FO list`.encode("UTF-8", Encoding.locale_charmap) - tasklist_output.split($/).select { |line| line =~ /^PID:/}.collect{ |line| line.gsub /PID:\s+/, '' } + tasklist_output.split($/).select { |line| line =~ /^PID:/ }.collect { |line| line.gsub(/PID:\s+/, '') } end # Find Resque worker pids on Linux and OS X. # def linux_worker_pids @@ -848,18 +857,12 @@ def log!(message) debug(message) end - def verbose - @verbose - end + attr_reader :verbose, :very_verbose - def very_verbose - @very_verbose - end - def verbose=(value); if value && !very_verbose Resque.logger.formatter = VerboseFormatter.new Resque.logger.level = Logger::INFO elsif !value @@ -907,10 +910,10 @@ Process.waitpid(@child) rescue SystemCallError nil end - job.fail(DirtyExit.new("Child process received unhandled signal #{$?.stopsig}", $?)) if $?.signaled? + job.fail(DirtyExit.new("Child process received unhandled signal #{$?}", $?)) if $?.signaled? @child = nil end def log_with_severity(severity, message) Logging.log(severity, message)