lib/quebert/backend/beanstalk.rb in quebert-2.0.4 vs lib/quebert/backend/beanstalk.rb in quebert-3.0.0
- old
+ new
@@ -1,62 +1,91 @@
-require 'beaneater'
+require "beaneater"
+require "forwardable"
module Quebert
module Backend
-
# Manage jobs on a Beanstalk queue out of process
class Beanstalk
- def initialize(host, tube_name)
- @host, @tube_name = host, tube_name
+ extend Forwardable
+ include Logging
+
+ # A buffer time in seconds added to the Beanstalk TTR for Quebert to do
+ # its own job cleanup The job will perform based on the Beanstalk TTR,
+ # but Beanstalk hangs on to the job just a little longer so that Quebert
+ # can bury the job or schedule a retry with the appropriate delay
+ TTR_BUFFER = 5
+
+ attr_reader :host, :queue
+ attr_writer :queues
+
+ def initialize(host, queue)
+ @host = host
+ @queue = queue
+ @queues = []
end
- def put(job, *args)
- priority, delay, ttr = args
- opts = {}
- opts[:pri] = priority unless priority.nil?
- opts[:delay] = delay unless delay.nil?
- opts[:ttr] = ttr unless ttr.nil?
- tube.put job.to_json, opts
+ def self.configure(opts = {})
+ new(opts.fetch(:host, "127.0.0.1:11300"), opts.fetch(:queue))
end
def reserve_without_controller(timeout=nil)
- tube.reserve timeout
+ watch_tubes
+ beanstalkd_tubes.reserve(timeout)
end
def reserve(timeout=nil)
- Controller::Beanstalk.new reserve_without_controller(timeout), self
+ Controller::Beanstalk.new(reserve_without_controller(timeout))
end
- def peek(state)
- tube.peek state
- end
-
# For testing purposes... I think there's a better way to do this though.
def drain!
while peek(:ready) do
reserve_without_controller.delete
end
while peek(:delayed) do
reserve_without_controller.delete
end
while peek(:buried) do
- tube.kick
+ kick
reserve_without_controller.delete
end
end
-
- def self.configure(opts={})
- opts[:host] ||= ['127.0.0.1:11300']
- new(opts[:host], opts[:tube])
+
+ # TODO add a queue param?
+ def_delegators :default_tube, :peek, :kick
+
+ def put(job)
+ tube = beanstalkd_tubes[job.queue || queue]
+ tube.put(job.to_json,
+ :pri => job.priority,
+ :delay => job.delay,
+ :ttr => job.ttr + TTR_BUFFER)
end
private
- def pool
- @pool ||= Beaneater::Pool.new Array(@host)
+
+ def default_tube
+ @default_tube ||= beanstalkd_tubes[queue]
end
- def tube
- @tube ||= pool.tubes[@tube_name]
+ def beanstalkd_connection
+ @beanstalkd_connection ||= Beaneater.new(host)
end
+
+ def beanstalkd_tubes
+ beanstalkd_connection.tubes
+ end
+
+ def watch_tubes
+ if queues != @watched_tube_names
+ @watched_tube_names = queues
+ logger.info "Watching beanstalkd queues #{@watched_tube_names.inspect}"
+ beanstalkd_tubes.watch!(*@watched_tube_names)
+ end
+ end
+
+ def queues
+ @queues.empty? ? [queue] : @queues
+ end
end
end
-end
\ No newline at end of file
+end