lib/quebert/backend/beanstalk.rb in quebert-2.0.4 vs lib/quebert/backend/beanstalk.rb in quebert-3.0.0

- old
+ new

@@ -1,62 +1,91 @@ -require 'beaneater' +require "beaneater" +require "forwardable" module Quebert module Backend - # Manage jobs on a Beanstalk queue out of process class Beanstalk - def initialize(host, tube_name) - @host, @tube_name = host, tube_name + extend Forwardable + include Logging + + # A buffer time in seconds added to the Beanstalk TTR for Quebert to do + # its own job cleanup The job will perform based on the Beanstalk TTR, + # but Beanstalk hangs on to the job just a little longer so that Quebert + # can bury the job or schedule a retry with the appropriate delay + TTR_BUFFER = 5 + + attr_reader :host, :queue + attr_writer :queues + + def initialize(host, queue) + @host = host + @queue = queue + @queues = [] end - def put(job, *args) - priority, delay, ttr = args - opts = {} - opts[:pri] = priority unless priority.nil? - opts[:delay] = delay unless delay.nil? - opts[:ttr] = ttr unless ttr.nil? - tube.put job.to_json, opts + def self.configure(opts = {}) + new(opts.fetch(:host, "127.0.0.1:11300"), opts.fetch(:queue)) end def reserve_without_controller(timeout=nil) - tube.reserve timeout + watch_tubes + beanstalkd_tubes.reserve(timeout) end def reserve(timeout=nil) - Controller::Beanstalk.new reserve_without_controller(timeout), self + Controller::Beanstalk.new(reserve_without_controller(timeout)) end - def peek(state) - tube.peek state - end - # For testing purposes... I think there's a better way to do this though. def drain! while peek(:ready) do reserve_without_controller.delete end while peek(:delayed) do reserve_without_controller.delete end while peek(:buried) do - tube.kick + kick reserve_without_controller.delete end end - - def self.configure(opts={}) - opts[:host] ||= ['127.0.0.1:11300'] - new(opts[:host], opts[:tube]) + + # TODO add a queue param? + def_delegators :default_tube, :peek, :kick + + def put(job) + tube = beanstalkd_tubes[job.queue || queue] + tube.put(job.to_json, + :pri => job.priority, + :delay => job.delay, + :ttr => job.ttr + TTR_BUFFER) end private - def pool - @pool ||= Beaneater::Pool.new Array(@host) + + def default_tube + @default_tube ||= beanstalkd_tubes[queue] end - def tube - @tube ||= pool.tubes[@tube_name] + def beanstalkd_connection + @beanstalkd_connection ||= Beaneater.new(host) end + + def beanstalkd_tubes + beanstalkd_connection.tubes + end + + def watch_tubes + if queues != @watched_tube_names + @watched_tube_names = queues + logger.info "Watching beanstalkd queues #{@watched_tube_names.inspect}" + beanstalkd_tubes.watch!(*@watched_tube_names) + end + end + + def queues + @queues.empty? ? [queue] : @queues + end end end -end \ No newline at end of file +end