lib/resque/worker.rb in mongo-resque-1.19.0.1 vs lib/resque/worker.rb in mongo-resque-1.20.0

- old
+ new

@@ -22,17 +22,17 @@ attr_writer :to_s # Returns an array of all worker objects. def self.all - mongo_workers.distinct(:worker).map{|worker| find(worker)}.compact + mongo_workers.distinct(:worker).map { |worker| find(worker) }.compact end # Returns an array of all worker objects currently processing # jobs. def self.working - working = mongo_workers.find({ 'working_on' => { '$exists' => true}}).to_a.map{|w| find(w['worker'])} + mongo_workers.find('working_on' => { '$exists' => true }).to_a.map { |w| find(w['worker']) } end # Returns a single worker object. Accepts a string id. def self.find(worker_id) if exists? worker_id @@ -51,11 +51,11 @@ end # Given a string worker id, return a boolean indicating whether the # worker exists def self.exists?(worker_id) - mongo_workers.find({ :worker => worker_id.to_s}).count > 0 + mongo_workers.find(:worker => worker_id.to_s).count > 0 end # Workers should be initialized with an array of string queue # names. The order is important: a Worker will check the first # queue given for a job. If none is found, it will check the @@ -106,17 +106,18 @@ loop do break if shutdown? if not paused? and job = reserve log "got: #{job.inspect}" + job.worker = self run_hook :before_fork, job working_on job if @child = fork srand # Reseeding procline "Forked #{@child} at #{Time.now.to_i}" - Process.wait + Process.wait(@child) else procline "Processing #{job.queue} since #{Time.now.to_i}" perform(job, &block) exit! unless @cant_fork end @@ -138,10 +139,11 @@ # DEPRECATED. Processes a single job. If none is given, it will # try to produce one. Usually run in the child. def process(job = nil, &block) return unless job ||= reserve + job.worker = self working_on job perform(job, &block) ensure done_working end @@ -186,11 +188,11 @@ # Returns a list of queues to use when searching for a job. # A splat ("*") means you want every queue (in alpha order) - this # can be useful for dynamically adding new queues. def queues - @queues[0] == "*" ? Resque.queues.sort : @queues + @queues.map {|queue| queue == "*" ? Resque.queues.sort : queue }.flatten.uniq end # Not every platform supports fork. Here we do our magic to # determine if yours does. def fork @@ -364,11 +366,10 @@ end # Given a job, tells Mongo we're working on it. Useful for seeing # what workers are doing and when. def working_on(job) - job.worker = self data = { :queue => job.queue, :run_at => Time.now.strftime("%Y/%m/%d %H:%M:%S %Z"), :payload => job.payload } mongo_workers.update({:worker => self.to_s}, { '$set' => { 'working_on' => data}}, :upsert => true) end @@ -457,11 +458,11 @@ @hostname ||= `hostname`.chomp end # Returns Integer PID of running worker def pid - @pid ||= to_s.split(":")[1].to_i + Process.pid end # Returns an Array of string pids of all the other workers on this # machine. Useful when pruning dead workers on startup. def worker_pids @@ -475,20 +476,20 @@ # Find Resque worker pids on Linux and OS X. # # Returns an Array of string pids of all the other workers on this # machine. Useful when pruning dead workers on startup. def linux_worker_pids - `ps -A -o pid,command | grep [r]esque | grep -v "resque-web"`.split("\n").map do |line| + `ps -A -o pid,command | grep "[r]esque" | grep -v "resque-web"`.split("\n").map do |line| line.split(' ')[0] end end # Find Resque worker pids on Solaris. # # Returns an Array of string pids of all the other workers on this # machine. Useful when pruning dead workers on startup. def solaris_worker_pids - `ps -A -o pid,comm | grep [r]uby | grep -v "resque-web"`.split("\n").map do |line| + `ps -A -o pid,comm | grep "[r]uby" | grep -v "resque-web"`.split("\n").map do |line| real_pid = line.split(' ')[0] pargs_command = `pargs -a #{real_pid} 2>/dev/null | grep [r]esque | grep -v "resque-web"` if pargs_command.split(':')[1] == " resque-#{Resque::Version}" real_pid end