lib/bluth.rb in bluth-0.5.3 vs lib/bluth.rb in bluth-0.6.0
- old
+ new
@@ -1,12 +1,19 @@
# encoding: utf-8
BLUTH_LIB_HOME = File.expand_path File.dirname(__FILE__) unless defined?(BLUTH_LIB_HOME)
+local_libs = %w{familia}
+local_libs.each { |dir|
+ a = File.join(BLUTH_LIB_HOME, '..', '..', dir, 'lib')
+ $:.unshift a
+}
+
require 'sysinfo'
+require 'storable'
+require 'gibbler'
require 'familia'
-
module Bluth
module VERSION
def self.to_s
load_config
[@version[:MAJOR], @version[:MINOR], @version[:PATCH]].join('.')
@@ -24,21 +31,19 @@
class Buster < Familia::Problem; end
# A non-fatal error. Gob succeeds.
class Maeby < Familia::Problem; end
# A shutdown request. We burn down the banana stand.
class Shutdown < Familia::Problem; end
-
- @db = 15
- @queues = {}
- @poptimeout = 60.seconds
+ @db = 0
+ @poptimeout = 60 #.seconds
@handlers = []
@locks = []
@sysinfo = nil
@priority = []
@scheduler = nil
class << self
- attr_reader :queues, :handlers, :db, :conf, :locks
+ attr_reader :handlers, :db, :conf, :locks
attr_accessor :redis, :uri, :priority, :scheduler, :poptimeout
def sysinfo
@sysinfo ||= SysInfo.new.freeze
@sysinfo
end
@@ -48,164 +53,247 @@
@locks.each { |lock|
Familia.info "Removing lock #{lock}"
Bluth.redis.del lock
}
end
+ def Bluth.find_locks
+ @locks = Bluth.redis.keys(Familia.rediskey('*', :lock))
+ end
def Bluth.queue?(n)
- @queues.has_key?(n.to_sym)
+ Bluth::Queue.queues.collect(&:name).member?(n.to_s.to_sym)
end
def Bluth.queue(n)
- @queues[n.to_sym]
+ raise ArgumentError, "No such queue: #{n}" unless queue?(n)
+ Bluth::Queue.send n
end
- def Bluth.conf=(conf={})
- @conf = conf.clone
- @conf[:db] = @db
- connect!
- @conf
- end
+ require 'bluth/worker'
- def Bluth.connect!
- @uri = Redis.uri(@conf).freeze
- @redis = Familia.connect @uri
- end
-
- def Bluth.find_locks
- @locks = Bluth.redis.keys(Familia.key('*', :lock))
- end
-
- class Queue
+ module Queue # if this is a module the
include Familia
- prefix :queue
- def self.rangeraw(count=100)
- gobids = Queue.redis.lrange(key, 0, count-1) || []
- end
- def self.range(count=100)
- gobids = rangeraw count
- gobids.collect { |gobid|
- gob = Gob.from_redis gobid
- next if gob.nil?
- gob.current_queue = self
- gob
- }.compact
- end
- def self.dequeue(gobid)
- Queue.redis.lrem key, 0, gobid
- end
- def self.inherited(obj)
- obj.prefix self.prefix
- obj.suffix obj.to_s.split('::').last.downcase.to_sym
- raise Buster.new("Duplicate queue: #{obj.suffix}") if Bluth.queue?(obj.suffix)
- Bluth.queues[obj.suffix] = obj
- super(obj)
- end
- def self.key(pref=nil,suff=nil)
- Familia.key( pref || prefix, suff || suffix)
- end
- def self.report
- Bluth.queues.keys.collect { |q|
- klass = Bluth.queue(q)
- ("%10s: %4d" % [q, klass.size])
- }.join($/)
- end
- def self.from_string(str)
- raise Buster, "Unknown queue: #{str}" unless Bluth.queue?(str)
- Bluth.queue(str)
- end
- def self.any?
- size > 0
- end
-
- def self.empty?
- size == 0
- end
-
- def self.size
- begin
- Queue.redis.llen key
- rescue => ex
- STDERR.puts ex.message, ex.backtrace
- 0
+ prefix [:bluth, :queue]
+ class_list :critical #, :class => Bluth::Gob
+ class_list :high
+ class_list :low
+ class_list :running
+ class_list :successful
+ class_list :failed
+ class_list :orphaned
+ class << self
+ # The complete list of queues in the order they were defined
+ def queues
+ Bluth::Queue.class_lists.collect(&:name).collect do |qname|
+ self.send qname
+ end
end
- end
- def self.push(gobid)
- Queue.redis.lpush self.key, gobid
- end
-
- def self.pop
- gobid = Queue.redis.rpoplpush key, Bluth::Running.key
- return if gobid.nil?
- Familia.ld "FOUND gob #{gobid} from #{self.key}"
- gob = Gob.from_redis gobid
- if gob.nil?
- Familia.info "No such gob object: #{gobid}"
- Bluth::Running.dequeue gobid
- return
+ # The subset of queues that new jobs arrive in, in order of priority
+ def entry_queues
+ Bluth.priority.collect { |qname| self.send qname }
end
- gob.current_queue = Bluth::Running
- gob.save
- gob
end
+
+ # Set default priority
+ Bluth.priority = [:critical, :high, :low]
end
# Workers use a blocking pop and will wait for up to
# Bluth.poptimeout (seconds) before returnning nil.
# Note that the queues are still processed in order.
# If all queues are empty, the first one to return a
# value is use. See:
#
# http://code.google.com/p/redis/wiki/BlpopCommand
+ def Bluth.shift
+ blocking_queue_handler :blpop
+ end
+
def Bluth.pop
- #Bluth.priority.each { |queue|
- # ret = queue.pop
- # return ret unless ret.nil?
- #}
+ blocking_queue_handler :brpop
+ end
+
+ private
+
+ # +meth+ is either :blpop or :brpop
+ def Bluth.blocking_queue_handler meth
+ gob = nil
begin
- #Familia.ld :BRPOP, Queue.redis, self, caller[1] if Familia.debug?
- order = Bluth.priority.collect { |queue| queue.key }
+ order = Bluth::Queue.entry_queues.collect(&:rediskey)
order << Bluth.poptimeout # We do it this way to support Ruby 1.8
- gobinfo = Bluth::Queue.redis.brpop *order
- unless gobinfo.nil?
- Familia.info "FOUND #{gobinfo.inspect}" if Familia.debug?
- gob = Gob.from_redis gobinfo[1]
- raise Bluth::Buster, "No such gob object: #{gobinfo[1]}" if gob.nil?
- Bluth::Running.push gob.id
- gob.current_queue = Bluth::Running
+ queue, gobid = *(Bluth::Queue.redis.send(meth, *order) || [])
+ unless queue.nil?
+ Familia.ld "FOUND #{gobid} id #{queue}"
+ gob = Gob.from_redis gobid
+ raise Bluth::Buster, "No such gob object: #{gobid}" if gob.nil?
+ Bluth::Queue.running << gob.jobid
+ gob.current_queue = :running
gob.save
end
rescue => ex
- if gobinfo.nil?
+ if queue.nil?
Familia.info "ERROR: #{ex.message}"
else
- Familia.info "ERROR (#{ex.message}); putting #{gobinfo[1]} back on queue"
- Bluth::Orphaned.push gobinfo[1]
+ Familia.info "ERROR (#{ex.message}): #{gobid} is an orphan"
+ Bluth::Queue.orphaned << gobid
end
+ Familia.ld ex.backtrace
end
gob
+ end
+end
+
+
+
+module Bluth
+ module Handler
+
+ def self.extended(obj)
+ obj.send :include, Familia
+ obj.class_string :success
+ obj.class_string :failure
+ obj.class_string :running
+ Bluth.handlers << obj
+ end
+
+ [:success, :failure, :running].each do |name|
+ define_method "#{name}!" do
+ self.send(name).increment
+ end
+ end
+
+ def enqueue(data={},q=nil)
+ q = self.queue(q)
+ gob = Gob.create generate_id(data), self, data
+ gob.current_queue = q.name
+ gob.created
+ gob.attempts = 0
+ gob.save
+ Familia.ld "ENQUEUING: #{self} #{gob.jobid.short} to #{q}"
+ q << gob.jobid
+ gob
+ end
+ def queue(name=nil)
+ @queue = name if name
+ Bluth::Queue.send(@queue || :high)
+ end
+ def generate_id(*args)
+ [self, Process.pid, Bluth.sysinfo.hostname, Time.now.to_f, *args].gibbler
+ end
+ def all
+ Bluth::Gob.instances.select do |gob|
+ gob.handler == self
+ end
+ end
+ def prepare
+ end
+
end
-
- class Critical < Queue
- end
- class High < Queue
- end
- class Low < Queue
- end
- class Running < Queue
- end
- class Failed < Queue
- end
- class Successful < Queue
- end
- class Scheduled < Queue
- end
- class Orphaned < Queue
- end
+end
+
+
+module Bluth
+ class Gob < Storable
+ MAX_ATTEMPTS = 3.freeze unless defined?(Gob::MAX_ATTEMPTS)
+ include Familia
+ prefix [:bluth, :gob]
+ ttl 3600 #.seconds
+ index :jobid
+ field :jobid => Gibbler::Digest
+ field :handler => String
+ field :data => Hash
+ field :messages => Array
+ field :attempts => Integer
+ field :create_time => Float
+ field :stime => Float
+ field :etime => Float
+ field :current_queue => Symbol
+ field :thread_id => Integer
+ field :cpu => Array
+ field :wid => Gibbler::Digest
+ include Familia::Stamps
- require 'bluth/gob'
- require 'bluth/worker'
-
- Bluth.priority = [Bluth::Critical, Bluth::High, Bluth::Low]
- Bluth.scheduler = ScheduleWorker
+ def jobid
+ Gibbler::Digest.new(@jobid)
+ end
+ def clear!
+ @attempts = 0
+ @messages = []
+ save
+ end
+ def preprocess
+ @attempts ||= 0
+ @messages ||= []
+ @create_time ||= Time.now.utc.to_f
+ end
+ def attempt?
+ attempts < MAX_ATTEMPTS
+ end
+ def attempt!
+ @attempts = attempts + 1
+ end
+ def current_queue
+ @current_queue
+ end
+ def handler
+ eval "::#{@handler}" if @handler
+ end
+ def perform
+ @attempts += 1
+ Familia.ld "PERFORM: #{self.to_hash.inspect}"
+ @stime = Time.now.utc.to_f
+ save # update the time
+ self.handler.prepare if self.class.respond_to?(:prepare)
+ self.handler.perform @data
+ @etime = Time.now.utc.to_f
+ save # update the time
+ end
+ def delayed?
+ start = @stime || 0
+ start > Time.now.utc.to_f
+ end
+ def retry!(msg=nil)
+ move! :high, msg
+ end
+ def failure!(msg=nil)
+ @etime = Time.now.utc.to_i
+ self.handler.failure!
+ move! :failed, msg
+ end
+ def success!(msg=nil)
+ @etime = Time.now.utc.to_i
+ self.handler.success!
+ move! :successful, msg
+ end
+ def duration
+ return 0 if @stime.nil?
+ et = @etime || Time.now.utc.to_i
+ et - @stime
+ end
+ def queue
+ Bluth.queue(current_queue)
+ end
+ def dequeue!
+ Familia.ld "Deleting #{self.jobid} from #{queue.rediskey}"
+ queue.remove 0, self.jobid
+ end
+ def running!
+ move! :running
+ end
+ def move!(to, msg=nil)
+ @thread_id = $$
+ #if to.to_s == current_queue.to_s
+ # raise Bluth::Buster, "Cannot move job to the queue it's in: #{to}"
+ #end
+ from, to = Bluth.queue(current_queue), Bluth.queue(to)
+ Familia.ld "Moving #{self.jobid} from #{from.rediskey} to #{to.rediskey}"
+ @messages << msg unless msg.nil? || msg.empty?
+ # We push first to make sure we never lose a Gob ID. Instead
+ # there's the small chance of a job ID being in two queues.
+ to << @jobid
+ ret = from.remove @jobid, 0
+ @current_queue = to.name
+ save # update messages
+ end
+ end
end