lib/resque/worker.rb in resque-1.24.1 vs lib/resque/worker.rb in resque-1.25.0.pre

- old
+ new

@@ -1,20 +1,52 @@ require 'time' +require 'set' module Resque # A Resque Worker processes jobs. On platforms that support fork(2), # the worker will fork off a child to process each job. This ensures # a clean slate when beginning the next job and cuts down on gradual # memory growth as well as low level failures. # # It also ensures workers are always listening to signals from you, # their master, and can react accordingly. class Worker - extend Resque::Helpers - include Resque::Helpers include Resque::Logging + def redis + Resque.redis + end + + def self.redis + Resque.redis + end + + # Given a Ruby object, returns a string suitable for storage in a + # queue. + def encode(object) + if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load) + MultiJson.dump object + else + MultiJson.encode object + end + end + + # Given a string, returns a Ruby object. + def decode(object) + return unless object + + begin + if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load) + MultiJson.load object + else + MultiJson.decode object + end + rescue ::MultiJson::DecodeError => e + raise DecodeException, e.message, e.backtrace + end + end + # Boolean indicating whether this worker can or can not fork. # Automatically set if a fork(2) fails. attr_accessor :cant_fork attr_accessor :term_timeout @@ -137,30 +169,36 @@ if not paused? and job = reserve log "got: #{job.inspect}" job.worker = self working_on job - procline "Processing #{job.queue} since #{Time.now.to_i} [#{job.payload_class}]" - if @child = fork(job) do - unregister_signal_handlers if term_child - reconnect - perform(job, &block) - exit! unless run_at_exit_hooks - end - + procline "Processing #{job.queue} since #{Time.now.to_i} [#{job.payload_class_name}]" + if @child = fork(job) srand # Reseeding procline "Forked #{@child} at #{Time.now.to_i}" begin Process.waitpid(@child) rescue SystemCallError nil end job.fail(DirtyExit.new($?.to_s)) if $?.signaled? else - reconnect - perform(job, &block) + unregister_signal_handlers if will_fork? && term_child + begin + + reconnect + perform(job, &block) + + rescue Exception => exception + report_failed_job(job,exception) + end + + if will_fork? + run_at_exit_hooks ? exit : exit! + end end + done_working @child = nil else break if interval.zero? log! "Sleeping for #{interval} seconds" @@ -169,13 +207,15 @@ end end unregister_worker rescue Exception => exception - log "Failed to start worker : #{exception.inspect}" + unless exception.class == SystemExit && !@child && run_at_exit_hooks + log "Failed to start worker : #{exception.inspect}" - unregister_worker(exception) + unregister_worker(exception) + end end # DEPRECATED. Processes a single job. If none is given, it will # try to produce one. Usually run in the child. def process(job = nil, &block) @@ -186,23 +226,32 @@ perform(job, &block) ensure done_working end + # Reports the exception and marks the job as failed + def report_failed_job(job,exception) + log "#{job.inspect} failed: #{exception.inspect}" + begin + job.fail(exception) + rescue Object => exception + log "Received exception when reporting failure: #{exception.inspect}" + end + begin + failed! + rescue Object => exception + log "Received exception when increasing failed jobs counter (redis issue) : #{exception.inspect}" + end + end + # Processes a given job in the child. def perform(job) begin run_hook :after_fork, job if will_fork? job.perform rescue Object => e - log "#{job.inspect} failed: #{e.inspect}" - begin - job.fail(e) - rescue Object => e - log "Received exception when reporting failure: #{e.inspect}" - end - failed! + report_failed_job(job,e) else log "done: #{job.inspect}" ensure yield job if block_given? end @@ -246,26 +295,39 @@ # Returns a list of queues to use when searching for a job. # A splat ("*") means you want every queue (in alpha order) - this # can be useful for dynamically adding new queues. def queues - @queues.map {|queue| queue == "*" ? Resque.queues.sort : queue }.flatten.uniq + @queues.map do |queue| + queue.strip! + if (matched_queues = glob_match(queue)).empty? + queue + else + matched_queues + end + end.flatten.uniq end + def glob_match(pattern) + Resque.queues.select do |queue| + File.fnmatch?(pattern, queue) + end.sort + end + # Not every platform supports fork. Here we do our magic to # determine if yours does. - def fork(job,&block) + def fork(job) return if @cant_fork # Only run before_fork hooks if we're actually going to fork # (after checking @cant_fork) run_hook :before_fork, job begin # IronRuby doesn't support `Kernel.fork` yet if Kernel.respond_to?(:fork) - Kernel.fork &block if will_fork? + Kernel.fork if will_fork? else raise NotImplementedError end rescue NotImplementedError @cant_fork = true @@ -322,11 +384,16 @@ log! "Registered signals" end def unregister_signal_handlers - trap('TERM') { raise TermException.new("SIGTERM") } + trap('TERM') do + trap ('TERM') do + # ignore subsequent terms + end + raise TermException.new("SIGTERM") + end trap('INT', 'DEFAULT') begin trap('QUIT', 'DEFAULT') trap('USR1', 'DEFAULT') @@ -423,23 +490,34 @@ # environment, we can determine if Redis is old and clean it up a bit. def prune_dead_workers all_workers = Worker.all known_workers = worker_pids unless all_workers.empty? all_workers.each do |worker| - host, pid, queues = worker.id.split(':') + host, pid, worker_queues_raw = worker.id.split(':') + worker_queues = worker_queues_raw.split(",") + unless @queues.include?("*") || (worker_queues.to_set == @queues.to_set) + # If the worker we are trying to prune does not belong to the queues + # we are listening to, we should not touch it. + # Attempt to prune a worker from different queues may easily result in + # an unknown class exception, since that worker could easily be even + # written in different language. + next + end next unless host == hostname next if known_workers.include?(pid) log! "Pruning dead worker: #{worker}" worker.unregister_worker end end # Registers ourself as a worker. Useful when entering the worker # lifecycle on startup. def register_worker - redis.sadd(:workers, self) - started! + redis.pipelined do + redis.sadd(:workers, self) + started! + end end # Runs a named hook, passing along any arguments. def run_hook(name, *args) return unless hooks = Resque.send(name) @@ -462,16 +540,18 @@ # it's not the precise instance that died. job.worker = self job.fail(exception || DirtyExit.new) end - redis.srem(:workers, self) - redis.del("worker:#{self}") - redis.del("worker:#{self}:started") + redis.pipelined do + redis.srem(:workers, self) + redis.del("worker:#{self}") + redis.del("worker:#{self}:started") - Stat.clear("processed:#{self}") - Stat.clear("failed:#{self}") + Stat.clear("processed:#{self}") + Stat.clear("failed:#{self}") + end end # Given a job, tells Redis we're working on it. Useful for seeing # what workers are doing and when. def working_on(job) @@ -483,12 +563,14 @@ end # Called when we are done working - clears our `working_on` state # and tells Redis we processed a job. def done_working - processed! - redis.del("worker:#{self}") + redis.pipelined do + processed! + redis.del("worker:#{self}") + end end # How many jobs has this worker processed? Returns an int. def processed Stat["processed:#{self}"] @@ -576,9 +658,11 @@ # Returns an Array of string pids of all the other workers on this # machine. Useful when pruning dead workers on startup. def worker_pids if RUBY_PLATFORM =~ /solaris/ solaris_worker_pids + elsif RUBY_PLATFORM =~ /mingw32/ + windows_worker_pids else linux_worker_pids end end