lib/bluth.rb in bluth-0.6.8 vs lib/bluth.rb in bluth-0.7.0

- old
+ new

@@ -86,40 +86,65 @@ Bluth::Queue.send n end require 'bluth/worker' - module Queue # if this is a module the + module Queue include Familia prefix [:bluth, :queue] class_list :critical #, :class => Bluth::Gob class_list :high class_list :low class_list :running class_list :successful class_list :failed class_list :orphaned + def self.create_queue name + unless queue(name) + q = Familia::List.new name, :parent => self + @queuecache[name.to_s.to_sym] = q + end + queue(name) + end class << self # The complete list of queues in the order they were defined def queues - Bluth::Queue.class_lists.collect(&:name).collect do |qname| + qs = Bluth::Queue.class_lists.collect(&:name).collect do |qname| self.send qname end + if defined?(Bluth::TimingBelt) + notch_queues = Bluth::TimingBelt.priority.collect { |notch| notch.queue } + qs.insert 1, *notch_queues + end + qs end # The subset of queues that new jobs arrive in, in order of priority def entry_queues - Bluth.priority.collect { |qname| self.send qname } + qs = Bluth.priority.collect { |qname| self.send qname } + if defined?(Bluth::TimingBelt) + notch_queues = Bluth::TimingBelt.priority.collect { |notch| notch.queue } + qs.insert 1, *notch_queues + end + qs end + def queue name + if class_list? name.to_s.to_sym + self.send(name) + else + @queuecache ||= {} + @queuecache[name.to_s.to_sym] + end + end end # Set default priority Bluth.priority = [:critical, :high, :low] end - # Workers use a blocking pop and will wait for up to - # Bluth.queuetimeout (seconds) before returnning nil. - # Note that the queues are still processed in order. + # Workers use a blocking pop and will wait for up to + # Bluth.queuetimeout (seconds) before returnning nil. + # Note that the queues are still processed in order. # If all queues are empty, the first one to return a # value is use. See: # # http://code.google.com/p/redis/wiki/BlpopCommand def Bluth.shift @@ -135,10 +160,11 @@ # +meth+ is either :blpop or :brpop def Bluth.blocking_queue_handler meth gob = nil begin order = Bluth::Queue.entry_queues.collect(&:rediskey) + Familia.ld " QUEUE ORDER: #{order.join(', ')}" order << Bluth.queuetimeout # We do it this way to support Ruby 1.8 queue, gobid = *(Bluth::Queue.redis.send(meth, *order) || []) unless queue.nil? Familia.ld "FOUND #{gobid} id #{queue}" if Familia.debug? gob = Gob.from_redis gobid @@ -177,24 +203,39 @@ define_method "#{name}!" do self.send(name).increment end end - def enqueue(data={},q=nil) - q = self.queue(q) + def engauge(data={}, notch=nil) + notch ||= Bluth::TimingBelt.notch 1 + gob = create_job data + gob.notch = notch.name + gob.save + Familia.ld "ENNOTCHING: #{self} #{gob.jobid.short} to #{notch.rediskey}" if Familia.debug? + notch.add gob.jobid + gob + end + + def create_job data={} gob = Gob.create generate_id(data), self, data - gob.current_queue = q.name gob.created gob.attempts = 0 + gob + end + + def enqueue(data={}, q=nil) + q = self.queue(q) if q.nil? || Symbol === q + gob = create_job data + gob.current_queue = q.name gob.save Familia.ld "ENQUEUING: #{self} #{gob.jobid.short} to #{q}" if Familia.debug? q << gob.jobid gob end def queue(name=nil) @queue = name if name - Bluth::Queue.send(@queue || :high) + Bluth::Queue.queue(@queue || :high) end def generate_id(*args) [self, Process.pid, Bluth.sysinfo.hostname, Time.now.to_f, *args].gibbler end def all @@ -220,9 +261,11 @@ field :handler => String field :data => Hash field :messages => Array field :attempts => Integer field :create_time => Float + field :backtrace + field :notch # populated only via TimingBelt field :stime => Float field :etime => Float field :current_queue => Symbol field :thread_id => Integer field :cpu => Array