lib/resque/worker.rb in resque-1.24.1 vs lib/resque/worker.rb in resque-1.25.0.pre
- old
+ new
@@ -1,20 +1,52 @@
require 'time'
+require 'set'
module Resque
# A Resque Worker processes jobs. On platforms that support fork(2),
# the worker will fork off a child to process each job. This ensures
# a clean slate when beginning the next job and cuts down on gradual
# memory growth as well as low level failures.
#
# It also ensures workers are always listening to signals from you,
# their master, and can react accordingly.
class Worker
- extend Resque::Helpers
- include Resque::Helpers
include Resque::Logging
+ def redis
+ Resque.redis
+ end
+
+ def self.redis
+ Resque.redis
+ end
+
+ # Given a Ruby object, returns a string suitable for storage in a
+ # queue.
+ def encode(object)
+ if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load)
+ MultiJson.dump object
+ else
+ MultiJson.encode object
+ end
+ end
+
+ # Given a string, returns a Ruby object.
+ def decode(object)
+ return unless object
+
+ begin
+ if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load)
+ MultiJson.load object
+ else
+ MultiJson.decode object
+ end
+ rescue ::MultiJson::DecodeError => e
+ raise DecodeException, e.message, e.backtrace
+ end
+ end
+
# Boolean indicating whether this worker can or can not fork.
# Automatically set if a fork(2) fails.
attr_accessor :cant_fork
attr_accessor :term_timeout
@@ -137,30 +169,36 @@
if not paused? and job = reserve
log "got: #{job.inspect}"
job.worker = self
working_on job
- procline "Processing #{job.queue} since #{Time.now.to_i} [#{job.payload_class}]"
- if @child = fork(job) do
- unregister_signal_handlers if term_child
- reconnect
- perform(job, &block)
- exit! unless run_at_exit_hooks
- end
-
+ procline "Processing #{job.queue} since #{Time.now.to_i} [#{job.payload_class_name}]"
+ if @child = fork(job)
srand # Reseeding
procline "Forked #{@child} at #{Time.now.to_i}"
begin
Process.waitpid(@child)
rescue SystemCallError
nil
end
job.fail(DirtyExit.new($?.to_s)) if $?.signaled?
else
- reconnect
- perform(job, &block)
+ unregister_signal_handlers if will_fork? && term_child
+ begin
+
+ reconnect
+ perform(job, &block)
+
+ rescue Exception => exception
+ report_failed_job(job,exception)
+ end
+
+ if will_fork?
+ run_at_exit_hooks ? exit : exit!
+ end
end
+
done_working
@child = nil
else
break if interval.zero?
log! "Sleeping for #{interval} seconds"
@@ -169,13 +207,15 @@
end
end
unregister_worker
rescue Exception => exception
- log "Failed to start worker : #{exception.inspect}"
+ unless exception.class == SystemExit && !@child && run_at_exit_hooks
+ log "Failed to start worker : #{exception.inspect}"
- unregister_worker(exception)
+ unregister_worker(exception)
+ end
end
# DEPRECATED. Processes a single job. If none is given, it will
# try to produce one. Usually run in the child.
def process(job = nil, &block)
@@ -186,23 +226,32 @@
perform(job, &block)
ensure
done_working
end
+ # Reports the exception and marks the job as failed
+ def report_failed_job(job,exception)
+ log "#{job.inspect} failed: #{exception.inspect}"
+ begin
+ job.fail(exception)
+ rescue Object => exception
+ log "Received exception when reporting failure: #{exception.inspect}"
+ end
+ begin
+ failed!
+ rescue Object => exception
+ log "Received exception when increasing failed jobs counter (redis issue) : #{exception.inspect}"
+ end
+ end
+
# Processes a given job in the child.
def perform(job)
begin
run_hook :after_fork, job if will_fork?
job.perform
rescue Object => e
- log "#{job.inspect} failed: #{e.inspect}"
- begin
- job.fail(e)
- rescue Object => e
- log "Received exception when reporting failure: #{e.inspect}"
- end
- failed!
+ report_failed_job(job,e)
else
log "done: #{job.inspect}"
ensure
yield job if block_given?
end
@@ -246,26 +295,39 @@
# Returns a list of queues to use when searching for a job.
# A splat ("*") means you want every queue (in alpha order) - this
# can be useful for dynamically adding new queues.
def queues
- @queues.map {|queue| queue == "*" ? Resque.queues.sort : queue }.flatten.uniq
+ @queues.map do |queue|
+ queue.strip!
+ if (matched_queues = glob_match(queue)).empty?
+ queue
+ else
+ matched_queues
+ end
+ end.flatten.uniq
end
+ def glob_match(pattern)
+ Resque.queues.select do |queue|
+ File.fnmatch?(pattern, queue)
+ end.sort
+ end
+
# Not every platform supports fork. Here we do our magic to
# determine if yours does.
- def fork(job,&block)
+ def fork(job)
return if @cant_fork
# Only run before_fork hooks if we're actually going to fork
# (after checking @cant_fork)
run_hook :before_fork, job
begin
# IronRuby doesn't support `Kernel.fork` yet
if Kernel.respond_to?(:fork)
- Kernel.fork &block if will_fork?
+ Kernel.fork if will_fork?
else
raise NotImplementedError
end
rescue NotImplementedError
@cant_fork = true
@@ -322,11 +384,16 @@
log! "Registered signals"
end
def unregister_signal_handlers
- trap('TERM') { raise TermException.new("SIGTERM") }
+ trap('TERM') do
+ trap ('TERM') do
+ # ignore subsequent terms
+ end
+ raise TermException.new("SIGTERM")
+ end
trap('INT', 'DEFAULT')
begin
trap('QUIT', 'DEFAULT')
trap('USR1', 'DEFAULT')
@@ -423,23 +490,34 @@
# environment, we can determine if Redis is old and clean it up a bit.
def prune_dead_workers
all_workers = Worker.all
known_workers = worker_pids unless all_workers.empty?
all_workers.each do |worker|
- host, pid, queues = worker.id.split(':')
+ host, pid, worker_queues_raw = worker.id.split(':')
+ worker_queues = worker_queues_raw.split(",")
+ unless @queues.include?("*") || (worker_queues.to_set == @queues.to_set)
+ # If the worker we are trying to prune does not belong to the queues
+ # we are listening to, we should not touch it.
+ # Attempt to prune a worker from different queues may easily result in
+ # an unknown class exception, since that worker could easily be even
+ # written in different language.
+ next
+ end
next unless host == hostname
next if known_workers.include?(pid)
log! "Pruning dead worker: #{worker}"
worker.unregister_worker
end
end
# Registers ourself as a worker. Useful when entering the worker
# lifecycle on startup.
def register_worker
- redis.sadd(:workers, self)
- started!
+ redis.pipelined do
+ redis.sadd(:workers, self)
+ started!
+ end
end
# Runs a named hook, passing along any arguments.
def run_hook(name, *args)
return unless hooks = Resque.send(name)
@@ -462,16 +540,18 @@
# it's not the precise instance that died.
job.worker = self
job.fail(exception || DirtyExit.new)
end
- redis.srem(:workers, self)
- redis.del("worker:#{self}")
- redis.del("worker:#{self}:started")
+ redis.pipelined do
+ redis.srem(:workers, self)
+ redis.del("worker:#{self}")
+ redis.del("worker:#{self}:started")
- Stat.clear("processed:#{self}")
- Stat.clear("failed:#{self}")
+ Stat.clear("processed:#{self}")
+ Stat.clear("failed:#{self}")
+ end
end
# Given a job, tells Redis we're working on it. Useful for seeing
# what workers are doing and when.
def working_on(job)
@@ -483,12 +563,14 @@
end
# Called when we are done working - clears our `working_on` state
# and tells Redis we processed a job.
def done_working
- processed!
- redis.del("worker:#{self}")
+ redis.pipelined do
+ processed!
+ redis.del("worker:#{self}")
+ end
end
# How many jobs has this worker processed? Returns an int.
def processed
Stat["processed:#{self}"]
@@ -576,9 +658,11 @@
# Returns an Array of string pids of all the other workers on this
# machine. Useful when pruning dead workers on startup.
def worker_pids
if RUBY_PLATFORM =~ /solaris/
solaris_worker_pids
+ elsif RUBY_PLATFORM =~ /mingw32/
+ windows_worker_pids
else
linux_worker_pids
end
end