lib/kthxbye/job.rb in kthxbye-1.0.1 vs lib/kthxbye/job.rb in kthxbye-1.0.2
- old
+ new
@@ -1,41 +1,54 @@
module Kthxbye
+
+ # This class is our main job runner. It also handles the instantiation and
+ # the meat of the job queuing and retreival.
class Job
include Helper
extend Helper
attr_accessor :id, :queue, :data, :worker
attr_reader :failed_attempts
- def self.add_to_queue(queue, id)
+ # Adds a job to the queue from a given job id
+ # and queue. useful for switching a job to another queue or adding a defined
+ # job to multiple queues.
+ def self.add_to_queue(id, queue)
redis.rpush( "queue:#{queue}", id )
end
- # insert a job into the queue
+ # The bulk of the job queuing method. Takes a string representing a queue
+ # name, a job class, and arguments to pass to the perform method of the
+ # job class. Returns a unique id for the job based on a redis "uniq_id" key
+ # (int) which is a simple incremented value. Queues the job in the given
+ # queue and the job in the data-store hash.
def self.create(queue, klass, *args)
raise "Need a queue to store job in" if queue.to_s.empty?
raise "No class to reference job type by" if klass.nil?
redis.incr :uniq_id
id = redis.get :uniq_id
Job.add_to_queue( queue, id )
Kthxbye.register_queue( queue )
-
+
+ # mark job as inactive currently. will mark active when job is getting run
+ redis.redis.sadd("jobs:inactive", id)
redis.hset( "data-store:#{queue}", id, encode( {:klass => klass, :payload => args} ) )
log "Created job in queue #{queue} with an unique key of #{id}"
return id.to_i
end
+ # Returns a job object for a given job id off of a given queue.
def self.find(id, queue)
data = decode( redis.hget( "data-store:#{queue}", id ) )
data ? Job.new(id, queue, data) : nil
end
- # removes all existence of this job and its data
- # returns the last known status of the job
+ # Removes all existence of this job and its data
+ # Returns the last known status of the job
def self.destroy(id, queue)
ret = Job.find(id, queue).status
# remove the element from the active queue
redis.lrem("queue:#{queue}", 0, id)
@@ -47,23 +60,30 @@
redis.hdel( :faulure, id )
return ret
end
- # instantiates a job for the worker to run
+ # Instantiates a job from a job id, a queue, and the job data.
+ # Most often used in the ::find method and for the worker to recreate
+ # the job for running.
def initialize(id, queue, data)
@id = id.to_i
@queue = queue
@data = data
@failed_attempts = Failure.fails_for_job(@id) # local tracking only, for rerun purposes
end
- # simply requeues a job
+ # Simply requeues the job to be rerun.
def rerun
Job.add_to_queue( @queue, @id )
end
+ # Returns the job's status. Will be one of 4 things.
+ # 1) :succeeded - the job ran and has a result
+ # 2) :failed - the job failed and reported an error
+ # 3) :active - job is being run.
+ # 4) :inactive - job is waiting to be run.
def status
if result
:succeeded
elsif Failure.find(@id)
:failed
@@ -72,52 +92,59 @@
else
:inactive
end
end
+ # Returns the job's result once it has been run.
def result
decode( redis.hget("result-store:#{@queue}", @id) )
end
- # simply removes this job from the active queue and places it
- # on the inactive list.
- # does not remove job payload
+ # Simply removes this job from the active queue and places it
+ # on the inactive list. Does not remove job payload from storage. It just
+ # removes its id from the actively run job queue.
def dequeue
redis.lrem("queue:#{@queue}", 0, @id)
inactive
end
- # does the heavy lifting of running a job
+ # Does all the heavy lifting of performing the job and storing the results.
+ # It will get the jobs class, payload and then run the job, storing the
+ # result in the result's store once complete. Also responsible for reporting
+ # errors and storing the job in the failure listing if an exception occurs.
def perform
begin
@klass = Object.const_get(@data['klass'])
@payload = @data['payload']
+ #set job active, getting ready to run
+ self.active
result = @klass.send(:perform, *@payload)
redis.hset( "result-store:#{@queue}", @id, encode( result ) )
return result
- rescue Exception => ex
+ rescue Object => ex
@failed_attempts += 1
log "Error occured: #{ex.message}. Try: #{@failed_attempts}/#{Kthxbye::Config.options[:attempts]}"
return Kthxbye::Failure.create( self, ex ) if @failed_attempts >= Kthxbye::Config.options[:attempts]
perform
end
end
- # will allow us to track when this job is being worked
+ # Removes the job from the inactive queue.
def active
redis.srem("jobs:inactive", @id)
end
def active?
!redis.sismember("jobs:inactive", @id)
end
+ # Places the job on the active queue
def inactive
redis.sadd("jobs:inactive", @id)
end
- def ==(obj)
+ def ==(obj) #:nodoc:
return false if obj.nil?
@data == obj.data &&
@id == obj.id &&
@queue == obj.queue
end