Sha256: 58588144b88ee7260cca87781da587019bff581f46b4696729769045bf37776e
Contents?: true
Size: 1.31 KB
Versions: 5
Compression:
Stored size: 1.31 KB
Contents
require 'beaneater' module Quebert module Backend # Manage jobs on a Beanstalk queue out of process class Beanstalk def put(job, *args) priority, delay, ttr = args opts = {} opts[:pri] = priority unless priority.nil? opts[:delay] = delay unless delay.nil? opts[:ttr] = ttr unless ttr.nil? @tube.put job.to_json, opts end def reserve_without_controller(timeout=nil) @tube.reserve timeout end def reserve(timeout=nil) Controller::Beanstalk.new reserve_without_controller(timeout), self end def peek(state) @tube.peek state end # For testing purposes... I think there's a better way to do this though. def drain! while peek(:ready) do reserve_without_controller.delete end while peek(:delayed) do reserve_without_controller.delete end while peek(:buried) do @tube.kick reserve_without_controller.delete end end def initialize(host, tube) @pool = Beaneater::Pool.new Array(host) @tube = @pool.tubes[tube] end def self.configure(opts={}) opts[:host] ||= ['127.0.0.1:11300'] new(opts[:host], opts[:tube]) end end end end
Version data entries
5 entries across 5 versions & 1 rubygems