lib/resque/worker.rb in mongo-resque-1.19.0.1 vs lib/resque/worker.rb in mongo-resque-1.20.0
- old
+ new
@@ -22,17 +22,17 @@
attr_writer :to_s
# Returns an array of all worker objects.
def self.all
- mongo_workers.distinct(:worker).map{|worker| find(worker)}.compact
+ mongo_workers.distinct(:worker).map { |worker| find(worker) }.compact
end
# Returns an array of all worker objects currently processing
# jobs.
def self.working
- working = mongo_workers.find({ 'working_on' => { '$exists' => true}}).to_a.map{|w| find(w['worker'])}
+ mongo_workers.find('working_on' => { '$exists' => true }).to_a.map { |w| find(w['worker']) }
end
# Returns a single worker object. Accepts a string id.
def self.find(worker_id)
if exists? worker_id
@@ -51,11 +51,11 @@
end
# Given a string worker id, return a boolean indicating whether the
# worker exists
def self.exists?(worker_id)
- mongo_workers.find({ :worker => worker_id.to_s}).count > 0
+ mongo_workers.find(:worker => worker_id.to_s).count > 0
end
# Workers should be initialized with an array of string queue
# names. The order is important: a Worker will check the first
# queue given for a job. If none is found, it will check the
@@ -106,17 +106,18 @@
loop do
break if shutdown?
if not paused? and job = reserve
log "got: #{job.inspect}"
+ job.worker = self
run_hook :before_fork, job
working_on job
if @child = fork
srand # Reseeding
procline "Forked #{@child} at #{Time.now.to_i}"
- Process.wait
+ Process.wait(@child)
else
procline "Processing #{job.queue} since #{Time.now.to_i}"
perform(job, &block)
exit! unless @cant_fork
end
@@ -138,10 +139,11 @@
# DEPRECATED. Processes a single job. If none is given, it will
# try to produce one. Usually run in the child.
def process(job = nil, &block)
return unless job ||= reserve
+ job.worker = self
working_on job
perform(job, &block)
ensure
done_working
end
@@ -186,11 +188,11 @@
# Returns a list of queues to use when searching for a job.
# A splat ("*") means you want every queue (in alpha order) - this
# can be useful for dynamically adding new queues.
def queues
- @queues[0] == "*" ? Resque.queues.sort : @queues
+ @queues.map {|queue| queue == "*" ? Resque.queues.sort : queue }.flatten.uniq
end
# Not every platform supports fork. Here we do our magic to
# determine if yours does.
def fork
@@ -364,11 +366,10 @@
end
# Given a job, tells Mongo we're working on it. Useful for seeing
# what workers are doing and when.
def working_on(job)
- job.worker = self
data = { :queue => job.queue,
:run_at => Time.now.strftime("%Y/%m/%d %H:%M:%S %Z"),
:payload => job.payload }
mongo_workers.update({:worker => self.to_s}, { '$set' => { 'working_on' => data}}, :upsert => true)
end
@@ -457,11 +458,11 @@
@hostname ||= `hostname`.chomp
end
# Returns Integer PID of running worker
def pid
- @pid ||= to_s.split(":")[1].to_i
+ Process.pid
end
# Returns an Array of string pids of all the other workers on this
# machine. Useful when pruning dead workers on startup.
def worker_pids
@@ -475,20 +476,20 @@
# Find Resque worker pids on Linux and OS X.
#
# Returns an Array of string pids of all the other workers on this
# machine. Useful when pruning dead workers on startup.
def linux_worker_pids
- `ps -A -o pid,command | grep [r]esque | grep -v "resque-web"`.split("\n").map do |line|
+ `ps -A -o pid,command | grep "[r]esque" | grep -v "resque-web"`.split("\n").map do |line|
line.split(' ')[0]
end
end
# Find Resque worker pids on Solaris.
#
# Returns an Array of string pids of all the other workers on this
# machine. Useful when pruning dead workers on startup.
def solaris_worker_pids
- `ps -A -o pid,comm | grep [r]uby | grep -v "resque-web"`.split("\n").map do |line|
+ `ps -A -o pid,comm | grep "[r]uby" | grep -v "resque-web"`.split("\n").map do |line|
real_pid = line.split(' ')[0]
pargs_command = `pargs -a #{real_pid} 2>/dev/null | grep [r]esque | grep -v "resque-web"`
if pargs_command.split(':')[1] == " resque-#{Resque::Version}"
real_pid
end