lib/resque/worker.rb in resque-1.22.0 vs lib/resque/worker.rb in resque-1.23.0

- old
+ new

@@ -134,27 +134,26 @@ break if shutdown? if not paused? and job = reserve log "got: #{job.inspect}" job.worker = self - run_hook :before_fork, job working_on job - if @child = fork + if @child = fork(job) srand # Reseeding procline "Forked #{@child} at #{Time.now.to_i}" begin Process.waitpid(@child) rescue SystemCallError nil end else - unregister_signal_handlers if !@cant_fork && term_child + unregister_signal_handlers if will_fork? && term_child procline "Processing #{job.queue} since #{Time.now.to_i}" - redis.client.reconnect # Don't share connection with parent + reconnect perform(job, &block) - exit! unless @cant_fork + exit! if will_fork? end done_working @child = nil else @@ -163,12 +162,13 @@ procline paused? ? "Paused" : "Waiting for #{@queues.join(',')}" sleep interval end end - ensure unregister_worker + rescue Exception => exception + unregister_worker(exception) end # DEPRECATED. Processes a single job. If none is given, it will # try to produce one. Usually run in the child. def process(job = nil, &block) @@ -182,11 +182,11 @@ end # Processes a given job in the child. def perform(job) begin - run_hook :after_fork, job + run_hook :after_fork, job if will_fork? job.perform rescue Object => e log "#{job.inspect} failed: #{e.inspect}" begin job.fail(e) @@ -217,28 +217,48 @@ log "Error reserving job: #{e.inspect}" log e.backtrace.join("\n") raise e end + # Reconnect to Redis to avoid sharing a connection with the parent, + # retry up to 3 times with increasing delay before giving up. + def reconnect + tries = 0 + begin + redis.client.reconnect + rescue Redis::BaseConnectionError + if (tries += 1) <= 3 + log "Error reconnecting to Redis; retrying" + sleep(tries) + retry + else + log "Error reconnecting to Redis; quitting" + raise + end + end + end + # Returns a list of queues to use when searching for a job. # A splat ("*") means you want every queue (in alpha order) - this # can be useful for dynamically adding new queues. def queues @queues.map {|queue| queue == "*" ? Resque.queues.sort : queue }.flatten.uniq end # Not every platform supports fork. Here we do our magic to # determine if yours does. - def fork - @cant_fork = true if $TESTING - + def fork(job) return if @cant_fork + + # Only run before_fork hooks if we're actually going to fork + # (after checking @cant_fork) + run_hook :before_fork, job begin # IronRuby doesn't support `Kernel.fork` yet if Kernel.respond_to?(:fork) - Kernel.fork + Kernel.fork if will_fork? else raise NotImplementedError end rescue NotImplementedError @cant_fork = true @@ -422,19 +442,19 @@ args.any? ? hook.call(*args) : hook.call end # Unregisters ourself as a worker. Useful when shutting down. - def unregister_worker + def unregister_worker(exception = nil) # If we're still processing a job, make sure it gets logged as a # failure. if (hash = processing) && !hash.empty? job = Job.new(hash['queue'], hash['payload']) # Ensure the proper worker is attached to this job, even if # it's not the precise instance that died. job.worker = self - job.fail(DirtyExit.new) + job.fail(exception || DirtyExit.new) end redis.srem(:workers, self) redis.del("worker:#{self}") redis.del("worker:#{self}:started") @@ -504,9 +524,13 @@ end # Boolean - true if idle, false if not def idle? state == :idle + end + + def will_fork? + !(@cant_fork || $TESTING) end # Returns a symbol representing the current worker state, # which can be either :working or :idle def state