lib/resque/worker.rb in resque-1.22.0 vs lib/resque/worker.rb in resque-1.23.0
- old
+ new
@@ -134,27 +134,26 @@
break if shutdown?
if not paused? and job = reserve
log "got: #{job.inspect}"
job.worker = self
- run_hook :before_fork, job
working_on job
- if @child = fork
+ if @child = fork(job)
srand # Reseeding
procline "Forked #{@child} at #{Time.now.to_i}"
begin
Process.waitpid(@child)
rescue SystemCallError
nil
end
else
- unregister_signal_handlers if !@cant_fork && term_child
+ unregister_signal_handlers if will_fork? && term_child
procline "Processing #{job.queue} since #{Time.now.to_i}"
- redis.client.reconnect # Don't share connection with parent
+ reconnect
perform(job, &block)
- exit! unless @cant_fork
+ exit! if will_fork?
end
done_working
@child = nil
else
@@ -163,12 +162,13 @@
procline paused? ? "Paused" : "Waiting for #{@queues.join(',')}"
sleep interval
end
end
- ensure
unregister_worker
+ rescue Exception => exception
+ unregister_worker(exception)
end
# DEPRECATED. Processes a single job. If none is given, it will
# try to produce one. Usually run in the child.
def process(job = nil, &block)
@@ -182,11 +182,11 @@
end
# Processes a given job in the child.
def perform(job)
begin
- run_hook :after_fork, job
+ run_hook :after_fork, job if will_fork?
job.perform
rescue Object => e
log "#{job.inspect} failed: #{e.inspect}"
begin
job.fail(e)
@@ -217,28 +217,48 @@
log "Error reserving job: #{e.inspect}"
log e.backtrace.join("\n")
raise e
end
+ # Reconnect to Redis to avoid sharing a connection with the parent,
+ # retry up to 3 times with increasing delay before giving up.
+ def reconnect
+ tries = 0
+ begin
+ redis.client.reconnect
+ rescue Redis::BaseConnectionError
+ if (tries += 1) <= 3
+ log "Error reconnecting to Redis; retrying"
+ sleep(tries)
+ retry
+ else
+ log "Error reconnecting to Redis; quitting"
+ raise
+ end
+ end
+ end
+
# Returns a list of queues to use when searching for a job.
# A splat ("*") means you want every queue (in alpha order) - this
# can be useful for dynamically adding new queues.
def queues
@queues.map {|queue| queue == "*" ? Resque.queues.sort : queue }.flatten.uniq
end
# Not every platform supports fork. Here we do our magic to
# determine if yours does.
- def fork
- @cant_fork = true if $TESTING
-
+ def fork(job)
return if @cant_fork
+
+ # Only run before_fork hooks if we're actually going to fork
+ # (after checking @cant_fork)
+ run_hook :before_fork, job
begin
# IronRuby doesn't support `Kernel.fork` yet
if Kernel.respond_to?(:fork)
- Kernel.fork
+ Kernel.fork if will_fork?
else
raise NotImplementedError
end
rescue NotImplementedError
@cant_fork = true
@@ -422,19 +442,19 @@
args.any? ? hook.call(*args) : hook.call
end
# Unregisters ourself as a worker. Useful when shutting down.
- def unregister_worker
+ def unregister_worker(exception = nil)
# If we're still processing a job, make sure it gets logged as a
# failure.
if (hash = processing) && !hash.empty?
job = Job.new(hash['queue'], hash['payload'])
# Ensure the proper worker is attached to this job, even if
# it's not the precise instance that died.
job.worker = self
- job.fail(DirtyExit.new)
+ job.fail(exception || DirtyExit.new)
end
redis.srem(:workers, self)
redis.del("worker:#{self}")
redis.del("worker:#{self}:started")
@@ -504,9 +524,13 @@
end
# Boolean - true if idle, false if not
def idle?
state == :idle
+ end
+
+ def will_fork?
+ !(@cant_fork || $TESTING)
end
# Returns a symbol representing the current worker state,
# which can be either :working or :idle
def state