lib/bluth.rb in bluth-0.6.8 vs lib/bluth.rb in bluth-0.7.0
- old
+ new
@@ -86,40 +86,65 @@
Bluth::Queue.send n
end
require 'bluth/worker'
- module Queue # if this is a module the
+ module Queue
include Familia
prefix [:bluth, :queue]
class_list :critical #, :class => Bluth::Gob
class_list :high
class_list :low
class_list :running
class_list :successful
class_list :failed
class_list :orphaned
+ def self.create_queue name
+ unless queue(name)
+ q = Familia::List.new name, :parent => self
+ @queuecache[name.to_s.to_sym] = q
+ end
+ queue(name)
+ end
class << self
# The complete list of queues in the order they were defined
def queues
- Bluth::Queue.class_lists.collect(&:name).collect do |qname|
+ qs = Bluth::Queue.class_lists.collect(&:name).collect do |qname|
self.send qname
end
+ if defined?(Bluth::TimingBelt)
+ notch_queues = Bluth::TimingBelt.priority.collect { |notch| notch.queue }
+ qs.insert 1, *notch_queues
+ end
+ qs
end
# The subset of queues that new jobs arrive in, in order of priority
def entry_queues
- Bluth.priority.collect { |qname| self.send qname }
+ qs = Bluth.priority.collect { |qname| self.send qname }
+ if defined?(Bluth::TimingBelt)
+ notch_queues = Bluth::TimingBelt.priority.collect { |notch| notch.queue }
+ qs.insert 1, *notch_queues
+ end
+ qs
end
+ def queue name
+ if class_list? name.to_s.to_sym
+ self.send(name)
+ else
+ @queuecache ||= {}
+ @queuecache[name.to_s.to_sym]
+ end
+ end
end
# Set default priority
Bluth.priority = [:critical, :high, :low]
end
- # Workers use a blocking pop and will wait for up to
- # Bluth.queuetimeout (seconds) before returnning nil.
- # Note that the queues are still processed in order.
+ # Workers use a blocking pop and will wait for up to
+ # Bluth.queuetimeout (seconds) before returnning nil.
+ # Note that the queues are still processed in order.
# If all queues are empty, the first one to return a
# value is use. See:
#
# http://code.google.com/p/redis/wiki/BlpopCommand
def Bluth.shift
@@ -135,10 +160,11 @@
# +meth+ is either :blpop or :brpop
def Bluth.blocking_queue_handler meth
gob = nil
begin
order = Bluth::Queue.entry_queues.collect(&:rediskey)
+ Familia.ld " QUEUE ORDER: #{order.join(', ')}"
order << Bluth.queuetimeout # We do it this way to support Ruby 1.8
queue, gobid = *(Bluth::Queue.redis.send(meth, *order) || [])
unless queue.nil?
Familia.ld "FOUND #{gobid} id #{queue}" if Familia.debug?
gob = Gob.from_redis gobid
@@ -177,24 +203,39 @@
define_method "#{name}!" do
self.send(name).increment
end
end
- def enqueue(data={},q=nil)
- q = self.queue(q)
+ def engauge(data={}, notch=nil)
+ notch ||= Bluth::TimingBelt.notch 1
+ gob = create_job data
+ gob.notch = notch.name
+ gob.save
+ Familia.ld "ENNOTCHING: #{self} #{gob.jobid.short} to #{notch.rediskey}" if Familia.debug?
+ notch.add gob.jobid
+ gob
+ end
+
+ def create_job data={}
gob = Gob.create generate_id(data), self, data
- gob.current_queue = q.name
gob.created
gob.attempts = 0
+ gob
+ end
+
+ def enqueue(data={}, q=nil)
+ q = self.queue(q) if q.nil? || Symbol === q
+ gob = create_job data
+ gob.current_queue = q.name
gob.save
Familia.ld "ENQUEUING: #{self} #{gob.jobid.short} to #{q}" if Familia.debug?
q << gob.jobid
gob
end
def queue(name=nil)
@queue = name if name
- Bluth::Queue.send(@queue || :high)
+ Bluth::Queue.queue(@queue || :high)
end
def generate_id(*args)
[self, Process.pid, Bluth.sysinfo.hostname, Time.now.to_f, *args].gibbler
end
def all
@@ -220,9 +261,11 @@
field :handler => String
field :data => Hash
field :messages => Array
field :attempts => Integer
field :create_time => Float
+ field :backtrace
+ field :notch # populated only via TimingBelt
field :stime => Float
field :etime => Float
field :current_queue => Symbol
field :thread_id => Integer
field :cpu => Array