Sha256: 58588144b88ee7260cca87781da587019bff581f46b4696729769045bf37776e

Contents?: true

Size: 1.31 KB

Versions: 5

Compression:

Stored size: 1.31 KB

Contents

require 'beaneater'

module Quebert
  module Backend
    
    # Manage jobs on a Beanstalk queue out of process
    class Beanstalk
      def put(job, *args)
        priority, delay, ttr = args
        opts = {}
        opts[:pri]   = priority unless priority.nil?
        opts[:delay] = delay    unless delay.nil?
        opts[:ttr]   = ttr      unless ttr.nil?
        @tube.put job.to_json, opts
      end

      def reserve_without_controller(timeout=nil)
        @tube.reserve timeout
      end

      def reserve(timeout=nil)
        Controller::Beanstalk.new reserve_without_controller(timeout), self
      end

      def peek(state)
        @tube.peek state
      end

      # For testing purposes... I think there's a better way to do this though.
      def drain!
        while peek(:ready) do
          reserve_without_controller.delete
        end
        while peek(:delayed) do
          reserve_without_controller.delete
        end
        while peek(:buried) do
          @tube.kick
          reserve_without_controller.delete
        end
      end
      
      def initialize(host, tube)
        @pool = Beaneater::Pool.new Array(host)
        @tube = @pool.tubes[tube]
      end
      def self.configure(opts={})
        opts[:host] ||= ['127.0.0.1:11300']
        new(opts[:host], opts[:tube])
      end
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
quebert-2.0.2 lib/quebert/backend/beanstalk.rb
quebert-2.0.1 lib/quebert/backend/beanstalk.rb
quebert-2.0.0 lib/quebert/backend/beanstalk.rb
quebert-1.12.0 lib/quebert/backend/beanstalk.rb
quebert-1.11.0 lib/quebert/backend/beanstalk.rb