module Kthxbye class Job include Helper extend Helper attr_accessor :id, :queue, :data, :worker attr_reader :failed_attempts def self.add_to_queue(queue, id) redis.rpush( "queue:#{queue}", id ) end # insert a job into the queue def self.create(queue, klass, *args) raise "Need a queue to store job in" if queue.to_s.empty? raise "No class to reference job type by" if klass.nil? redis.incr :uniq_id id = redis.get :uniq_id Job.add_to_queue( queue, id ) Kthxbye.register_queue( queue ) redis.hset( "data-store:#{queue}", id, encode( {:klass => klass, :payload => args} ) ) log "Created job in queue #{queue} with an unique key of #{id}" return id.to_i end def self.find(id, queue) data = decode( redis.hget( "data-store:#{queue}", id ) ) data ? Job.new(id, queue, data) : nil end # removes all existence of this job and its data # returns the last known status of the job def self.destroy(id, queue) ret = Job.find(id, queue).status # remove the element from the active queue redis.lrem("queue:#{queue}", 0, id) # be sure we also remove it from the inactive queue redis.srem("queue:#{queue}:inactive", id) # remove the job's data as well redis.hdel("data-store:#{queue}", id) redis.hdel("result-store:#{queue}", id) redis.hdel( :faulure, id ) return ret end # instantiates a job for the worker to run def initialize(id, queue, data) @id = id.to_i @queue = queue @data = data @failed_attempts = Failure.fails_for_job(@id) # local tracking only, for rerun purposes end # simply requeues a job def rerun Job.add_to_queue( @queue, @id ) end def status if result :succeeded elsif Failure.find(@id) :failed elsif active? :active else :inactive end end def result decode( redis.hget("result-store:#{@queue}", @id) ) end # simply removes this job from the active queue and places it # on the inactive list. # does not remove job payload def dequeue redis.lrem("queue:#{@queue}", 0, @id) inactive end # does the heavy lifting of running a job def perform begin @klass = Object.const_get(@data['klass']) @payload = @data['payload'] result = @klass.send(:perform, *@payload) redis.hset( "result-store:#{@queue}", @id, encode( result ) ) return result rescue Exception => ex @failed_attempts += 1 log "Error occured: #{ex.message}. Try: #{@failed_attempts}/#{Kthxbye::Config.options[:attempts]}" return Kthxbye::Failure.create( self, ex ) if @failed_attempts >= Kthxbye::Config.options[:attempts] perform end end # will allow us to track when this job is being worked def active redis.srem("jobs:inactive", @id) end def active? !redis.sismember("jobs:inactive", @id) end def inactive redis.sadd("jobs:inactive", @id) end def ==(obj) return false if obj.nil? @data == obj.data && @id == obj.id && @queue == obj.queue end end end