lib/bluth/gob.rb in bluth-0.5.3 vs lib/bluth/gob.rb in bluth-0.6.0

- old
+ new

@@ -1,180 +2 @@ - -module Bluth - - class Gob < Storable - MAX_ATTEMPTS = 3.freeze unless defined?(Gob::MAX_ATTEMPTS) - include Familia - prefix :gob - ttl 1.hour - field :id => Gibbler::Digest - field :kind => String - field :data => Hash - field :messages => Array - field :attempts => Integer - field :create_time => Float - field :stime => Float - field :etime => Float - field :current_queue => String - field :thread_id => Integer - field :cpu => Array - field :wid => Gibbler::Digest - - def self.inherited(obj) - obj.extend Bluth::Gob::ClassMethods - obj.prefix [:job, obj.to_s.split('::').last.downcase].join(':') - Bluth.handlers << obj - end - - module ClassMethods - def clear - keys.each do |key| - Gob.redis.del key - end - end - def enqueue(data={},q=nil) - q ||= self.queue - job = Gob.create generate_id(data), self, data - job.current_queue = q - Familia.ld "ENQUEUING: #{self} #{job.id.short} to #{q}" - Bluth::Queue.redis.lpush q.key, job.id - job.create_time = Time.now.utc.to_f - job.attempts = 0 - job - end - def queue(name=nil) - @queue = name if name - @queue || Bluth::High - end - def generate_id(*args) - a = [self, Process.pid, Bluth.sysinfo.hostname, Time.now.to_f, *args] - a.gibbler - end - def all - Bluth::Gob.all.select do |job| - job.kind == self - end - end - def size - all.size - end - def lock_key - Familia.key(prefix, :lock) - end - def lock! - raise Bluth::Buster, "#{self} is already locked!" if locked? - Familia.info "Locking #{self}" - ret = Bluth::Gob.redis.set lock_key, 1 - Bluth.locks << lock_key - ret == 'OK' - end - def unlock! - Familia.info "Unlocking #{self}" - ret = Bluth::Gob.redis.del lock_key - Bluth.locks.delete lock_key - ret - end - def locked? - Bluth::Gob.redis.exists lock_key - end - def prepare - end - - [:success, :failure, :running].each do |w| - define_method "#{w}_key" do # success_key - Familia.key(self.prefix, w) - end - define_method "#{w}!" do |*args| # success!(1) - by = args.first || 1 - Bluth::Gob.redis.incrby send("#{w}_key"), by - end - define_method "#{w}" do # success - Bluth::Gob.redis.get(send("#{w}_key")).to_i - end - end - end - - def id - @id = Gibbler::Digest.new(@id) if String === @id - end - def clear! - @attempts = 0 - @messages = [] - save - end - def preprocess - @attempts ||= 0 - @messages ||= [] - @create_time ||= Time.now.utc.to_f - end - def attempt? - attempts < MAX_ATTEMPTS - end - def attempt! - @attempts = attempts + 1 - end - def current_queue - @current_queue - end - def kind - @kind = eval "::#{@kind}" rescue @kind if @kind.is_a?(String) - @kind - end - def kind=(v) - @kind = v - end - def perform - @attempts += 1 - Familia.ld "PERFORM: #{self.to_hash.inspect}" - @stime = Time.now.utc.to_f - save # update the time - self.kind.prepare if self.class.respond_to?(:prepare) - self.kind.perform @data - @etime = Time.now.utc.to_f - save # update the time - end - def delayed? - start = @stime || 0 - start > Time.now.utc.to_f - end - def retry!(msg=nil) - move! Bluth::High, msg - end - def failure!(msg=nil) - @etime = Time.now.utc.to_i - self.kind.failure! - move! Bluth::Failed, msg - end - def success!(msg=nil) - @etime = Time.now.utc.to_i - self.kind.success! - move! Bluth::Successful, msg - end - def duration - return 0 if @stime.nil? - et = @etime || Time.now.utc.to_i - et - @stime - end - def dequeue! - Familia.ld "Deleting #{self.id} from #{current_queue.key}" - Bluth::Queue.redis.lrem current_queue.key, 0, self.id - end - private - def move!(to, msg=nil) - @thread_id = $$ - if to.to_s == current_queue.to_s - raise Bluth::Buster, "Cannot move job to the queue it's in: #{to}" - end - Familia.ld "Moving #{self.id.short} from #{current_queue.key} to #{to.key}" - @messages << msg unless msg.nil? || msg.empty? - # We push first to make sure we never lose a Gob ID. Instead - # there's the small chance of a job ID being in two queues. - Bluth::Queue.redis.lpush to.key, @id - dequeue! - save # update messages - @current_queue = to - end - end - -end -