module Kthxbye # This is the workhorse loop of the gem. Does all the dequeuing and running of jobs. # It mostly makes a bunch of calls to the job methods to run the job. It simply handles # the spawning off of processes to run the job and retry if necessary. class Worker include Helper extend Helper attr_accessor :sleep_for, :queues, :current_queue, :id # Creates a worker for running jobs off of a given queue. # Takes a queue or queues (csv style, e.g. test,other,bad) or the woldcard # (*) symbol to take in all queues in alphabetical order. # Optionally takes in an interval param on how long it waits to check the # queue for new jobs once it has exhausted the queue(s). def initialize(queues, sleep_for=5) setup_queues(queues) @sleep_for = sleep_for end def setup_queues(queues) # :nodoc: if queues == "*" @queues = Kthxbye.queues.sort elsif queues.include? ?, @queues = queues.split(",").compact else @queues = *queues end end # Allows us to find a worker so that we can look at some of its internal data # later on. def self.find(worker) if exists? worker qs = worker.split(':')[-1].split(",") new_worker = new(*qs) new_worker.id = worker return new_worker else nil end end # Checks if a worker is registered. def self.exists?(id) redis.sismember( :workers, id ) end # Gets the job a given worker is working on # Returns a hash with the 'job_id' and the 'started' time def self.working_on(id) decode( redis.get( "worker:#{id}" ) ) end # This is the major run loop. Workhorse of a worker... sort of. # In the end, this loop simply runs the jobs in separate processes by forking # out the process then waiting for it to return. we only process one. # Can optionally take in a block to run after the job has run. def run(&block) log "Starting Kthxbye::Worker #{self}" startup loop do break if @terminate if !@paused and job = grab_job log "Found job #{job}" working(job) @child = fork { log "Forking..." result = job.perform yield job if block_given? exit! } Process.wait done else break if @sleep_for == 0 log "No jobs on #{@queues} - sleeping for #{@sleep_for}" sleep sleep_for.to_i end end ensure unregister_worker end # Returns the queues this worker is attached toin alphabetical order. def queues @queues.sort end # Run startup actions def startup #:nodoc: clean_workers register_worker register_signals end # This method cleans Redis of all workers that no longer exist that may have # been left over from a previous dirty shutdown (GC) def clean_workers workers = Kthxbye.workers known = worker_pids puts workers puts known workers.each do |worker| host,pid,queues = worker.id.split(":") next unless host == hostname next if known.include?(pid) log "Pruning unknown worker: #{worker}" worker.unregister_worker end end # Adds this worker to the worker registry def register_worker log "Registered worker #{self}" redis.sadd( :workers, self ) if !exists? end # Removes the worker from our worker registry def unregister_worker log "Unregistered worker #{self}" if working? log "Was active. Reporting and rerunning" Failure.create(current_job, ActiveWorkerKilled.new) current_job.rerun end redis.del "worker:#{self}" redis.srem :workers, self end # Gets the current job this worker is working. def current_job return @current_job if @current_job data = decode( redis.get("worker:#{self}") ) @current_job = Job.find( data['job_id'], @current_queue ) end # Run when the job starts running def working(job) #:nodoc: redis.sadd( :working, self ) data = encode( {:job_id => job.id, :started => Time.now.to_s} ) redis.set("worker:#{self}", data) @current_job = job # activates job job.active redis.publish("job.started", job.id) end # Is this job working? def working? redis.sismember( :working, self ) end # job complete actions def done #:nodoc: redis.srem( :working, self ) redis.del( "worker:#{self}" ) log "Completed job #{@current_job}" redis.publish("job.completed", @current_job.id) @current_job = nil end # # thanks to http://github.com/defunkt/resque/blob/master/lib/resque/worker.rb for these signals # def register_signals #:nordoc: trap('TERM') { shutdown! } trap('INT') { shutdown! } begin trap('QUIT') { shutdown } trap('USR1') { shutdown } trap('USR2') { log "Paused"; @paused = true } trap('CONT') { log "Unpaused"; @paused = false } rescue ArgumentError warn "Signals QUIT, USR1, USR2, and/or CONT not supported." end log "Registered signals" end # Shuts down the worker gracefully (once process has completed def shutdown #:nodoc: log "Shutting down worker #{self}" @terminate = true end # Hard kills the worker by killing the process. def shutdown! #:nodoc: kill_child shutdown end def kill_child #:nodoc: if @child log "Killing child at #{@child}" if system("ps -o pid,state -p #{@child}") Process.kill("KILL", @child) rescue nil else log "Child #{@child} not found, restarting." shutdown end end end # Reserves a job off the queue def grab_job #:nodoc: job = nil @queues.each do |q| @current_queue = q log "Checking \"#{q}\" queue for jobs" job = Kthxbye.salvage(q) break unless job.nil? end return job || false end # Checks if this worker is registered. def exists? #:nodoc: redis.sismember( :workers, self ) end # Returns the hostname of the machine this worker is running on def hostname @hostname ||= `hostname`.chomp end # Returns the process id of this worker. def pid Process.pid end # Returns an array of string pids of all the other workers on this # machine. Useful when pruning dead workers on startup. def worker_pids `ps -A -o pid,command | grep kthxbye`.split("\n").map do |line| line.split(' ')[0] end end # Returns a useful id with the hostname:pid:queues listing # Same return as to_s def id @id ||= "#{hostname}:#{pid}:#{queues.join(",")}" end alias_method :to_s, :id # nice inspect for the worker with the same info as #id def inspect "#" end def ==(other) #:nodoc: to_s == other.to_s end def <=>(other) #:nodoc: to_s <=> other.to_s end end end