module Bluth class Gob < Storable MAX_ATTEMPTS = 3.freeze unless defined?(Gob::MAX_ATTEMPTS) include Familia prefix :gob ttl 1.hour field :id => Gibbler::Digest field :kind => String field :data => Hash field :messages => Array field :attempts => Integer field :create_time => Float field :stime => Float field :etime => Float field :current_queue => String field :thread_id => Integer field :cpu => Array field :wid => Gibbler::Digest def self.inherited(obj) obj.extend Bluth::Gob::ClassMethods obj.prefix [:job, obj.to_s.split('::').last.downcase].join(':') Bluth.handlers << obj end module ClassMethods def clear keys.each do |key| Gob.redis.del key end end def enqueue(data={},q=nil) q ||= self.queue job = Gob.create generate_id(data), self, data job.current_queue = q Familia.ld "ENQUEUING: #{self} #{job.id.short} to #{q}" Bluth::Queue.redis.lpush q.key, job.id job.create_time = Time.now.utc.to_f job.attempts = 0 job end def queue(name=nil) @queue = name if name @queue || Bluth::High end def generate_id(*args) a = [self, Process.pid, Bluth.sysinfo.hostname, Time.now.to_f, *args] a.gibbler end def all Bluth::Gob.all.select do |job| job.kind == self end end def size all.size end def lock_key Familia.key(prefix, :lock) end def lock! raise Bluth::Buster, "#{self} is already locked!" if locked? Familia.info "Locking #{self}" ret = Bluth::Gob.redis.set lock_key, 1 Bluth.locks << lock_key ret == 'OK' end def unlock! Familia.info "Unlocking #{self}" ret = Bluth::Gob.redis.del lock_key Bluth.locks.delete lock_key ret end def locked? Bluth::Gob.redis.exists lock_key end def prepare end [:success, :failure, :running].each do |w| define_method "#{w}_key" do # success_key Familia.key(self.prefix, w) end define_method "#{w}!" do |*args| # success!(1) by = args.first || 1 Bluth::Gob.redis.incrby send("#{w}_key"), by end define_method "#{w}" do # success Bluth::Gob.redis.get(send("#{w}_key")).to_i end end end def id @id = Gibbler::Digest.new(@id) if String === @id end def clear! @attempts = 0 @messages = [] save end def preprocess @attempts ||= 0 @messages ||= [] @create_time ||= Time.now.utc.to_f end def attempt? attempts < MAX_ATTEMPTS end def attempt! @attempts = attempts + 1 end def current_queue @current_queue end def kind @kind = eval "::#{@kind}" rescue @kind if @kind.is_a?(String) @kind end def kind=(v) @kind = v end def perform @attempts += 1 Familia.ld "PERFORM: #{self.to_hash.inspect}" @stime = Time.now.utc.to_f save # update the time self.kind.prepare if self.class.respond_to?(:prepare) self.kind.perform @data @etime = Time.now.utc.to_f save # update the time end def delayed? start = @stime || 0 start > Time.now.utc.to_f end def retry!(msg=nil) move! Bluth::High, msg end def failure!(msg=nil) @etime = Time.now.utc.to_i self.kind.failure! move! Bluth::Failed, msg end def success!(msg=nil) @etime = Time.now.utc.to_i self.kind.success! move! Bluth::Successful, msg end def duration return 0 if @stime.nil? et = @etime || Time.now.utc.to_i et - @stime end def dequeue! Familia.ld "Deleting #{self.id} from #{current_queue.key}" Bluth::Queue.redis.lrem current_queue.key, 0, self.id end private def move!(to, msg=nil) @thread_id = $$ if to.to_s == current_queue.to_s raise Bluth::Buster, "Cannot move job to the queue it's in: #{to}" end Familia.ld "Moving #{self.id.short} from #{current_queue.key} to #{to.key}" @messages << msg unless msg.nil? || msg.empty? # We push first to make sure we never lose a Gob ID. Instead # there's the small chance of a job ID being in two queues. Bluth::Queue.redis.lpush to.key, @id dequeue! save # update messages @current_queue = to end end end