lib/kthxbye/job.rb in kthxbye-1.0.1 vs lib/kthxbye/job.rb in kthxbye-1.0.2

- old
+ new

@@ -1,41 +1,54 @@ module Kthxbye + + # This class is our main job runner. It also handles the instantiation and + # the meat of the job queuing and retreival. class Job include Helper extend Helper attr_accessor :id, :queue, :data, :worker attr_reader :failed_attempts - def self.add_to_queue(queue, id) + # Adds a job to the queue from a given job id + # and queue. useful for switching a job to another queue or adding a defined + # job to multiple queues. + def self.add_to_queue(id, queue) redis.rpush( "queue:#{queue}", id ) end - # insert a job into the queue + # The bulk of the job queuing method. Takes a string representing a queue + # name, a job class, and arguments to pass to the perform method of the + # job class. Returns a unique id for the job based on a redis "uniq_id" key + # (int) which is a simple incremented value. Queues the job in the given + # queue and the job in the data-store hash. def self.create(queue, klass, *args) raise "Need a queue to store job in" if queue.to_s.empty? raise "No class to reference job type by" if klass.nil? redis.incr :uniq_id id = redis.get :uniq_id Job.add_to_queue( queue, id ) Kthxbye.register_queue( queue ) - + + # mark job as inactive currently. will mark active when job is getting run + redis.redis.sadd("jobs:inactive", id) redis.hset( "data-store:#{queue}", id, encode( {:klass => klass, :payload => args} ) ) log "Created job in queue #{queue} with an unique key of #{id}" return id.to_i end + # Returns a job object for a given job id off of a given queue. def self.find(id, queue) data = decode( redis.hget( "data-store:#{queue}", id ) ) data ? Job.new(id, queue, data) : nil end - # removes all existence of this job and its data - # returns the last known status of the job + # Removes all existence of this job and its data + # Returns the last known status of the job def self.destroy(id, queue) ret = Job.find(id, queue).status # remove the element from the active queue redis.lrem("queue:#{queue}", 0, id) @@ -47,23 +60,30 @@ redis.hdel( :faulure, id ) return ret end - # instantiates a job for the worker to run + # Instantiates a job from a job id, a queue, and the job data. + # Most often used in the ::find method and for the worker to recreate + # the job for running. def initialize(id, queue, data) @id = id.to_i @queue = queue @data = data @failed_attempts = Failure.fails_for_job(@id) # local tracking only, for rerun purposes end - # simply requeues a job + # Simply requeues the job to be rerun. def rerun Job.add_to_queue( @queue, @id ) end + # Returns the job's status. Will be one of 4 things. + # 1) :succeeded - the job ran and has a result + # 2) :failed - the job failed and reported an error + # 3) :active - job is being run. + # 4) :inactive - job is waiting to be run. def status if result :succeeded elsif Failure.find(@id) :failed @@ -72,52 +92,59 @@ else :inactive end end + # Returns the job's result once it has been run. def result decode( redis.hget("result-store:#{@queue}", @id) ) end - # simply removes this job from the active queue and places it - # on the inactive list. - # does not remove job payload + # Simply removes this job from the active queue and places it + # on the inactive list. Does not remove job payload from storage. It just + # removes its id from the actively run job queue. def dequeue redis.lrem("queue:#{@queue}", 0, @id) inactive end - # does the heavy lifting of running a job + # Does all the heavy lifting of performing the job and storing the results. + # It will get the jobs class, payload and then run the job, storing the + # result in the result's store once complete. Also responsible for reporting + # errors and storing the job in the failure listing if an exception occurs. def perform begin @klass = Object.const_get(@data['klass']) @payload = @data['payload'] + #set job active, getting ready to run + self.active result = @klass.send(:perform, *@payload) redis.hset( "result-store:#{@queue}", @id, encode( result ) ) return result - rescue Exception => ex + rescue Object => ex @failed_attempts += 1 log "Error occured: #{ex.message}. Try: #{@failed_attempts}/#{Kthxbye::Config.options[:attempts]}" return Kthxbye::Failure.create( self, ex ) if @failed_attempts >= Kthxbye::Config.options[:attempts] perform end end - # will allow us to track when this job is being worked + # Removes the job from the inactive queue. def active redis.srem("jobs:inactive", @id) end def active? !redis.sismember("jobs:inactive", @id) end + # Places the job on the active queue def inactive redis.sadd("jobs:inactive", @id) end - def ==(obj) + def ==(obj) #:nodoc: return false if obj.nil? @data == obj.data && @id == obj.id && @queue == obj.queue end