lib/resque/worker.rb in resque-1.26.pre.0 vs lib/resque/worker.rb in resque-1.26.0

- old
+ new

@@ -8,12 +8,16 @@ # memory growth as well as low level failures. # # It also ensures workers are always listening to signals from you, # their master, and can react accordingly. class Worker + include Resque::Helpers + extend Resque::Helpers include Resque::Logging + WORKER_HEARTBEAT_KEY = "workers:heartbeat" + def redis Resque.redis end def self.redis @@ -21,50 +25,37 @@ end # Given a Ruby object, returns a string suitable for storage in a # queue. def encode(object) - if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load) - MultiJson.dump object - else - MultiJson.encode object - end + Resque.encode(object) end # Given a string, returns a Ruby object. def decode(object) - return unless object - - begin - if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load) - MultiJson.load object - else - MultiJson.decode object - end - rescue ::MultiJson::DecodeError => e - raise DecodeException, e.message, e.backtrace - end + Resque.decode(object) end - # Boolean indicating whether this worker can or can not fork. - # Automatically set if a fork(2) fails. - attr_accessor :cant_fork - attr_accessor :term_timeout # decide whether to use new_kill_child logic attr_accessor :term_child + # should term kill workers gracefully (vs. immediately) + # Makes SIGTERM work like SIGQUIT + attr_accessor :graceful_term + # When set to true, forked workers will exit with `exit`, calling any `at_exit` code handlers that have been # registered in the application. Otherwise, forked workers exit with `exit!` attr_accessor :run_at_exit_hooks attr_writer :to_s + attr_writer :pid # Returns an array of all worker objects. def self.all - Array(redis.smembers(:workers)).map { |id| find(id) }.compact + Array(redis.smembers(:workers)).map { |id| find(id, :skip_exists => true) }.compact end # Returns an array of all worker objects currently processing # jobs. def self.working @@ -85,20 +76,26 @@ reportedly_working[name] = value unless value.nil? || value.empty? end end reportedly_working.keys.map do |key| - find key.sub("worker:", '') + worker = find(key.sub("worker:", ''), :skip_exists => true) + worker.job = worker.decode(reportedly_working[key]) + worker end.compact end # Returns a single worker object. Accepts a string id. - def self.find(worker_id) - if exists? worker_id - queues = worker_id.split(':')[-1].split(',') + def self.find(worker_id, options = {}) + skip_exists = options[:skip_exists] + + if skip_exists || exists?(worker_id) + host, pid, queues_raw = worker_id.split(':') + queues = queues_raw.split(',') worker = new(*queues) worker.to_s = worker_id + worker.pid = pid.to_i worker else nil end end @@ -124,13 +121,42 @@ # # If passed a single "*", this Worker will operate on all queues # in alphabetical order. Queues can be dynamically added or # removed without needing to restart workers using this method. def initialize(*queues) - @queues = queues.map { |queue| queue.to_s.strip } @shutdown = nil @paused = nil + @before_first_fork_hook_ran = false + + self.verbose = ENV['LOGGING'] || ENV['VERBOSE'] + self.very_verbose = ENV['VVERBOSE'] + self.term_timeout = ENV['RESQUE_TERM_TIMEOUT'] || 4.0 + self.term_child = ENV['TERM_CHILD'] + self.graceful_term = ENV['GRACEFUL_TERM'] + self.run_at_exit_hooks = ENV['RUN_AT_EXIT_HOOKS'] + + if ENV['BACKGROUND'] + unless Process.respond_to?('daemon') + abort "env var BACKGROUND is set, which requires ruby >= 1.9" + end + Process.daemon(true) + self.reconnect + end + + if ENV['PIDFILE'] + File.open(ENV['PIDFILE'], 'w') { |f| f << pid } + end + + self.queues = queues + end + + def queues=(queues) + queues = queues.empty? ? (ENV["QUEUES"] || ENV['QUEUE']).to_s.split(',') : queues + @queues = queues.map { |queue| queue.to_s.strip } + unless ['*', '?', '{', '}', '[', ']'].any? {|char| @queues.join.include?(char) } + @static_queues = @queues.flatten.uniq + end validate_queues end # A worker must be given a queue, otherwise it won't know what to # do with itself. @@ -140,10 +166,24 @@ if @queues.nil? || @queues.empty? raise NoQueueError.new("Please give each worker at least one queue.") end end + # Returns a list of queues to use when searching for a job. + # A splat ("*") means you want every queue (in alpha order) - this + # can be useful for dynamically adding new queues. + def queues + return @static_queues if @static_queues + @queues.map { |queue| glob_match(queue) }.flatten.uniq + end + + def glob_match(pattern) + Resque.queues.select do |queue| + File.fnmatch?(pattern, queue) + end.sort + end + # This is the main workhorse method. Called on a Worker instance, # it begins the worker life cycle. # # The following events occur during a worker's life cycle: # @@ -165,11 +205,11 @@ loop do break if shutdown? if not paused? and job = reserve - log "got: #{job.inspect}" + log_with_severity :info, "got: #{job.inspect}" job.worker = self working_on job procline "Processing #{job.queue} since #{Time.now.to_i} [#{job.payload_class_name}]" if @child = fork(job) @@ -178,16 +218,16 @@ begin Process.waitpid(@child) rescue SystemCallError nil end - job.fail(DirtyExit.new($?.to_s)) if $?.signaled? + job.fail(DirtyExit.new("Child process received unhandled signal #{$?.stopsig}")) if $?.signaled? else unregister_signal_handlers if will_fork? && term_child begin - reconnect + reconnect if will_fork? perform(job, &block) rescue Exception => exception report_failed_job(job,exception) end @@ -199,20 +239,20 @@ done_working @child = nil else break if interval.zero? - log! "Sleeping for #{interval} seconds" - procline paused? ? "Paused" : "Waiting for #{@queues.join(',')}" + log_with_severity :debug, "Sleeping for #{interval} seconds" + procline paused? ? "Paused" : "Waiting for #{queues.join(',')}" sleep interval end end unregister_worker rescue Exception => exception unless exception.class == SystemExit && !@child && run_at_exit_hooks - log "Failed to start worker : #{exception.inspect}" + log_with_severity :error, "Failed to start worker : #{exception.inspect}" unregister_worker(exception) end end @@ -228,20 +268,20 @@ done_working end # Reports the exception and marks the job as failed def report_failed_job(job,exception) - log "#{job.inspect} failed: #{exception.inspect}" + log_with_severity :error, "#{job.inspect} failed: #{exception.inspect}" begin job.fail(exception) rescue Object => exception - log "Received exception when reporting failure: #{exception.inspect}" + log_with_severity :error, "Received exception when reporting failure: #{exception.inspect}" end begin failed! rescue Object => exception - log "Received exception when increasing failed jobs counter (redis issue) : #{exception.inspect}" + log_with_severity :error, "Received exception when increasing failed jobs counter (redis issue) : #{exception.inspect}" end end # Processes a given job in the child. def perform(job) @@ -249,31 +289,31 @@ run_hook :after_fork, job if will_fork? job.perform rescue Object => e report_failed_job(job,e) else - log "done: #{job.inspect}" + log_with_severity :info, "done: #{job.inspect}" ensure yield job if block_given? end end # Attempts to grab a job off one of the provided queues. Returns # nil if no job can be found. def reserve queues.each do |queue| - log! "Checking #{queue}" + log_with_severity :debug, "Checking #{queue}" if job = Resque.reserve(queue) - log! "Found job on #{queue}" + log_with_severity :debug, "Found job on #{queue}" return job end end nil rescue Exception => e - log "Error reserving job: #{e.inspect}" - log e.backtrace.join("\n") + log_with_severity :error, "Error reserving job: #{e.inspect}" + log_with_severity :error, e.backtrace.join("\n") raise e end # Reconnect to Redis to avoid sharing a connection with the parent, # retry up to 3 times with increasing delay before giving up. @@ -281,53 +321,33 @@ tries = 0 begin redis.client.reconnect rescue Redis::BaseConnectionError if (tries += 1) <= 3 - log "Error reconnecting to Redis; retrying" + log_with_severity :error, "Error reconnecting to Redis; retrying" sleep(tries) retry else - log "Error reconnecting to Redis; quitting" + log_with_severity :error, "Error reconnecting to Redis; quitting" raise end end end - # Returns a list of queues to use when searching for a job. - # A splat ("*") means you want every queue (in alpha order) - this - # can be useful for dynamically adding new queues. - def queues - @queues.map do |queue| - queue.strip! - if (matched_queues = glob_match(queue)).empty? - queue - else - matched_queues - end - end.flatten.uniq - end - - def glob_match(pattern) - Resque.queues.select do |queue| - File.fnmatch?(pattern, queue) - end.sort - end - # Not every platform supports fork. Here we do our magic to # determine if yours does. def fork(job) - return if @cant_fork + return unless will_fork? # Only run before_fork hooks if we're actually going to fork - # (after checking @cant_fork) + # (after checking will_fork?) run_hook :before_fork, job begin # IronRuby doesn't support `Kernel.fork` yet if Kernel.respond_to?(:fork) - Kernel.fork if will_fork? + Kernel.fork else raise NotImplementedError end rescue NotImplementedError @cant_fork = true @@ -335,13 +355,13 @@ end end # Runs all the methods needed when a worker begins its lifecycle. def startup - Kernel.warn "WARNING: This way of doing signal handling is now deprecated. Please see http://hone.heroku.com/resque/2012/08/21/resque-signals.html for more info." unless term_child or $TESTING enable_gc_optimizations register_signal_handlers + start_heartbeat prune_dead_workers run_hook :before_first_fork register_worker # Fix buffering so we can `rake resque:work > resque.log` and @@ -364,11 +384,11 @@ # QUIT: Shutdown after the current job has finished processing. # USR1: Kill the forked child immediately, continue processing jobs. # USR2: Don't process any new jobs # CONT: Start processing jobs again after a USR2 def register_signal_handlers - trap('TERM') { shutdown! } + trap('TERM') { graceful_term ? shutdown : shutdown! } trap('INT') { shutdown! } begin trap('QUIT') { shutdown } if term_child @@ -377,23 +397,23 @@ trap('USR1') { kill_child } end trap('USR2') { pause_processing } trap('CONT') { unpause_processing } rescue ArgumentError - warn "Signals QUIT, USR1, USR2, and/or CONT not supported." + log_with_severity :warn, "Signals QUIT, USR1, USR2, and/or CONT not supported." end - log! "Registered signals" + log_with_severity :debug, "Registered signals" end def unregister_signal_handlers trap('TERM') do - trap ('TERM') do - # ignore subsequent terms - end - raise TermException.new("SIGTERM") - end + trap ('TERM') do + # ignore subsequent terms + end + raise TermException.new("SIGTERM") + end trap('INT', 'DEFAULT') begin trap('QUIT', 'DEFAULT') trap('USR1', 'DEFAULT') @@ -403,19 +423,28 @@ end # Schedule this worker for shutdown. Will finish processing the # current job. def shutdown - log 'Exiting...' + log_with_severity :info, 'Exiting...' @shutdown = true end # Kill the child and shutdown immediately. + # If not forking, abort this process. def shutdown! shutdown if term_child - new_kill_child + if fork_per_job? + new_kill_child + else + # Raise TermException in the same process + trap('TERM') do + # ignore subsequent terms + end + raise TermException.new("SIGTERM") + end else kill_child end end @@ -426,58 +455,102 @@ # Kills the forked child immediately, without remorse. The job it # is processing will not be completed. def kill_child if @child - log! "Killing child at #{@child}" + log_with_severity :debug, "Killing child at #{@child}" if `ps -o pid,state -p #{@child}` Process.kill("KILL", @child) rescue nil else - log! "Child #{@child} not found, restarting." + log_with_severity :debug, "Child #{@child} not found, restarting." shutdown end end end + def heartbeat + heartbeat = redis.hget(WORKER_HEARTBEAT_KEY, to_s) + heartbeat && Time.parse(heartbeat) + end + + def self.all_heartbeats + redis.hgetall(WORKER_HEARTBEAT_KEY) + end + + # Returns a list of workers that have sent a heartbeat in the past, but which + # already expired (does NOT include workers that have never sent a heartbeat at all). + def self.all_workers_with_expired_heartbeats + workers = Worker.all + heartbeats = Worker.all_heartbeats + + workers.select do |worker| + id = worker.to_s + heartbeat = heartbeats[id] + + if heartbeat + seconds_since_heartbeat = (Time.now - Time.parse(heartbeat)).to_i + seconds_since_heartbeat > Resque.prune_interval + else + false + end + end + end + + def heartbeat!(time = Time.now) + redis.hset(WORKER_HEARTBEAT_KEY, to_s, time.iso8601) + end + + def start_heartbeat + heartbeat! + @heart = Thread.new do + loop do + sleep(Resque.heartbeat_interval) + heartbeat! + end + end + end + # Kills the forked child immediately with minimal remorse. The job it # is processing will not be completed. Send the child a TERM signal, # wait 5 seconds, and then a KILL signal if it has not quit def new_kill_child if @child unless Process.waitpid(@child, Process::WNOHANG) - log! "Sending TERM signal to child #{@child}" + log_with_severity :debug, "Sending TERM signal to child #{@child}" Process.kill("TERM", @child) (term_timeout.to_f * 10).round.times do |i| sleep(0.1) return if Process.waitpid(@child, Process::WNOHANG) end - log! "Sending KILL signal to child #{@child}" + log_with_severity :debug, "Sending KILL signal to child #{@child}" Process.kill("KILL", @child) else - log! "Child #{@child} already quit." + log_with_severity :debug, "Child #{@child} already quit." end end rescue SystemCallError - log! "Child #{@child} already quit and reaped." + log_with_severity :error, "Child #{@child} already quit and reaped." end # are we paused? def paused? @paused end # Stop processing jobs after the current one has completed (if we're # currently running one). def pause_processing - log "USR2 received; pausing job processing" + log_with_severity :info, "USR2 received; pausing job processing" + run_hook :before_pause, self @paused = true end # Start processing jobs again after a pause def unpause_processing - log "CONT received; resuming job processing" + log_with_severity :info, "CONT received; resuming job processing" @paused = false + run_hook :after_pause, self end # Looks for any workers which should be running on this server # and, if they're not, removes them from Redis. # @@ -488,25 +561,44 @@ # # By checking the current Redis state against the actual # environment, we can determine if Redis is old and clean it up a bit. def prune_dead_workers all_workers = Worker.all - known_workers = worker_pids unless all_workers.empty? + + unless all_workers.empty? + known_workers = worker_pids + all_workers_with_expired_heartbeats = Worker.all_workers_with_expired_heartbeats + end + all_workers.each do |worker| + # If the worker hasn't sent a heartbeat, remove it from the registry. + # + # If the worker hasn't ever sent a heartbeat, we won't remove it since + # the first heartbeat is sent before the worker is registred it means + # that this is a worker that doesn't support heartbeats, e.g., another + # client library or an older version of Resque. We won't touch these. + if all_workers_with_expired_heartbeats.include?(worker) + log_with_severity :info, "Pruning dead worker: #{worker}" + worker.unregister_worker(PruneDeadWorkerDirtyExit.new(worker.to_s)) + next + end + host, pid, worker_queues_raw = worker.id.split(':') worker_queues = worker_queues_raw.split(",") unless @queues.include?("*") || (worker_queues.to_set == @queues.to_set) # If the worker we are trying to prune does not belong to the queues - # we are listening to, we should not touch it. + # we are listening to, we should not touch it. # Attempt to prune a worker from different queues may easily result in - # an unknown class exception, since that worker could easily be even + # an unknown class exception, since that worker could easily be even # written in different language. next end + next unless host == hostname next if known_workers.include?(pid) - log! "Pruning dead worker: #{worker}" + + log_with_severity :debug, "Pruning dead worker: #{worker}" worker.unregister_worker end end # Registers ourself as a worker. Useful when entering the worker @@ -519,39 +611,61 @@ end # Runs a named hook, passing along any arguments. def run_hook(name, *args) return unless hooks = Resque.send(name) + return if name == :before_first_fork && @before_first_fork_hook_ran msg = "Running #{name} hooks" msg << " with #{args.inspect}" if args.any? - log msg + log_with_severity :info, msg hooks.each do |hook| args.any? ? hook.call(*args) : hook.call + @before_first_fork_hook_ran = true if name == :before_first_fork end end + def kill_background_threads + @heart.kill if @heart + end + # Unregisters ourself as a worker. Useful when shutting down. def unregister_worker(exception = nil) # If we're still processing a job, make sure it gets logged as a # failure. if (hash = processing) && !hash.empty? job = Job.new(hash['queue'], hash['payload']) # Ensure the proper worker is attached to this job, even if # it's not the precise instance that died. job.worker = self - job.fail(exception || DirtyExit.new) + begin + job.fail(exception || DirtyExit.new("Job still being processed")) + rescue RuntimeError => e + log_with_severity :error, e.message + end end + kill_background_threads + redis.pipelined do redis.srem(:workers, self) redis.del("worker:#{self}") redis.del("worker:#{self}:started") + redis.hdel(WORKER_HEARTBEAT_KEY, self.to_s) Stat.clear("processed:#{self}") Stat.clear("failed:#{self}") end + rescue Exception => exception_while_unregistering + message = exception_while_unregistering.message + if exception + message = message + "\nOriginal Exception (#{exception.class}): #{exception.message}\n" + + " #{exception.backtrace.join(" \n")}" + end + fail(exception_while_unregistering.class, + message, + exception_while_unregistering.backtrace) end # Given a job, tells Redis we're working on it. Useful for seeing # what workers are doing and when. def working_on(job) @@ -602,13 +716,15 @@ def started! redis.set("worker:#{self}:started", Time.now.to_s) end # Returns a hash explaining the Job we're currently processing, if any. - def job - decode(redis.get("worker:#{self}")) || {} + def job(reload = true) + @job = nil if reload + @job ||= decode(redis.get("worker:#{self}")) || {} end + attr_writer :job alias_method :processing, :job # Boolean - true if working, false if not def working? state == :working @@ -618,13 +734,17 @@ def idle? state == :idle end def will_fork? - !@cant_fork && !$TESTING && (ENV["FORK_PER_JOB"] != 'false') + !@cant_fork && fork_per_job? end + def fork_per_job? + ENV["FORK_PER_JOB"] != 'false' + end + # Returns a symbol representing the current worker state, # which can be either :working or :idle def state redis.exists("worker:#{self}") ? :working : :idle end @@ -675,11 +795,11 @@ end # Find Resque worker pids on Linux and OS X. # def linux_worker_pids - `ps -A -o pid,command | grep "[r]esque" | grep -v "resque-web"`.split("\n").map do |line| + `ps -A -o pid,command | grep -E "[r]esque:work|[r]esque-[0-9]" | grep -v "resque-web"`.split("\n").map do |line| line.split(' ')[0] end end # Find Resque worker pids on Solaris. @@ -696,71 +816,60 @@ end.compact end # Given a string, sets the procline ($0) and logs. # Procline is always in the format of: - # resque-VERSION: STRING + # RESQUE_PROCLINE_PREFIXresque-VERSION: STRING def procline(string) - $0 = "resque-#{Resque::Version}: #{string}" - log! $0 + $0 = "#{ENV['RESQUE_PROCLINE_PREFIX']}resque-#{Resque::Version}: #{string}" + log_with_severity :debug, $0 end - # Log a message to Resque.logger - # can't use alias_method since info/debug are private methods def log(message) info(message) end def log!(message) debug(message) end - # Deprecated legacy methods for controlling the logging threshhold - # Use Resque.logger.level now, e.g.: - # - # Resque.logger.level = Logger::DEBUG - # + def verbose - logger_severity_deprecation_warning @verbose end def very_verbose - logger_severity_deprecation_warning @very_verbose end def verbose=(value); - logger_severity_deprecation_warning - if value && !very_verbose Resque.logger.formatter = VerboseFormatter.new + Resque.logger.level = Logger::INFO elsif !value Resque.logger.formatter = QuietFormatter.new end @verbose = value end def very_verbose=(value) - logger_severity_deprecation_warning if value Resque.logger.formatter = VeryVerboseFormatter.new + Resque.logger.level = Logger::DEBUG elsif !value && verbose Resque.logger.formatter = VerboseFormatter.new + Resque.logger.level = Logger::INFO else Resque.logger.formatter = QuietFormatter.new end @very_verbose = value end - def logger_severity_deprecation_warning - return if $TESTING - return if $warned_logger_severity_deprecation - Kernel.warn "*** DEPRECATION WARNING: Resque::Worker#verbose and #very_verbose are deprecated. Please set Resque.logger.level instead" - Kernel.warn "Called from: #{caller[0..5].join("\n\t")}" - $warned_logger_severity_deprecation = true - nil + private + + def log_with_severity(severity, message) + Logging.log(severity, message) end end end