module Kthxbye # This class is our main job runner. It also handles the instantiation and # the meat of the job queuing and retreival. class Job include Helper extend Helper attr_accessor :id, :queue, :data, :worker attr_reader :failed_attempts # Adds a job to the queue from a given job id # and queue. useful for switching a job to another queue or adding a defined # job to multiple queues. def self.add_to_queue(id, queue) redis.rpush( "queue:#{queue}", id ) end # The bulk of the job queuing method. Takes a string representing a queue # name, a job class, and arguments to pass to the perform method of the # job class. Returns a unique id for the job based on a redis "uniq_id" key # (int) which is a simple incremented value. Queues the job in the given # queue and the job in the data-store hash. def self.create(queue, klass, *args) raise "Need a queue to store job in" if queue.to_s.empty? raise "No class to reference job type by" if klass.nil? redis.incr :uniq_id id = redis.get :uniq_id Job.add_to_queue( id, queue ) Kthxbye.register_queue( queue ) # mark job as inactive currently. will mark active when job is getting run redis.sadd("jobs:inactive", id) redis.hset( "data-store:#{queue}", id, encode( {:klass => klass.to_s, :payload => args} ) ) log "Created job in queue #{queue} with an unique key of #{id}" return id.to_i end # Returns a job object for a given job id off of a given queue. def self.find(id, queue) data = decode( redis.hget( "data-store:#{queue}", id ) ) data ? Job.new(id, queue, data) : nil end # Removes all existence of this job and its data # Returns the last known status of the job def self.destroy(id, queue) ret = Job.find(id, queue).status # remove the element from the active queue redis.lrem("queue:#{queue}", 0, id) # be sure we also remove it from the inactive queue redis.srem("queue:#{queue}:inactive", id) # remove the job's data as well redis.hdel("data-store:#{queue}", id) redis.hdel("result-store:#{queue}", id) redis.hdel( :faulure, id ) return ret end # Instantiates a job from a job id, a queue, and the job data. # Most often used in the ::find method and for the worker to recreate # the job for running. def initialize(id, queue, data) @id = id.to_i @queue = queue @data = data @failed_attempts = Failure.fails_for_job(@id) # local tracking only, for rerun purposes end # Simply requeues the job to be rerun. def rerun Job.add_to_queue( @id, @queue ) end # Returns the job's status. Will be one of 4 things. # 1) :succeeded - the job ran and has a result # 2) :failed - the job failed and reported an error # 3) :active - job is being run. # 4) :inactive - job is waiting to be run. def status if result :succeeded elsif Failure.find(@id) :failed elsif active? :active else :inactive end end # Returns the job's result once it has been run. def result decode( redis.hget("result-store:#{@queue}", @id) ) end # Simply removes this job from the active queue and places it # on the inactive list. Does not remove job payload from storage. It just # removes its id from the actively run job queue. def dequeue redis.lrem("queue:#{@queue}", 0, @id) inactive end # Does all the heavy lifting of performing the job and storing the results. # It will get the jobs class, payload and then run the job, storing the # result in the result's store once complete. Also responsible for reporting # errors and storing the job in the failure listing if an exception occurs. # Will also publish a message on the job.failed channel (Redis PUBSUB) with # the id of the failed job def perform begin @klass = Object.const_get(@data['klass']) @payload = @data['payload'] #set job active, getting ready to run self.active result = @klass.send(:perform, *@payload) redis.hset( "result-store:#{@queue}", @id, encode( result ) ) return result rescue Object => ex @failed_attempts += 1 log "Error occured: #{ex.message}. Try: #{@failed_attempts}/#{Kthxbye::Config.options[:attempts]}" redis.publish("job.failed", @id) return Kthxbye::Failure.create( self, ex ) if @failed_attempts >= Kthxbye::Config.options[:attempts] perform end end # Removes the job from the inactive queue. def active redis.srem("jobs:inactive", @id) end def active? !redis.sismember("jobs:inactive", @id) end # Places the job on the active queue def inactive redis.sadd("jobs:inactive", @id) end def ==(obj) #:nodoc: return false if obj.nil? @data == obj.data && @id == obj.id && @queue == obj.queue end end end