Sha256: a7fde47d840ae8d1df9eb82322516f9c8e705c227ed8e7f820107f3f0af81998

Contents?: true

Size: 1.43 KB

Versions: 2

Compression:

Stored size: 1.43 KB

Contents

require 'beaneater'

module Quebert
  module Backend
    
    # Manage jobs on a Beanstalk queue out of process
    class Beanstalk
      def initialize(host, tube_name)
        @host, @tube_name = host, tube_name
      end

      def put(job, *args)
        priority, delay, ttr = args
        opts = {}
        opts[:pri]   = priority unless priority.nil?
        opts[:delay] = delay    unless delay.nil?
        opts[:ttr]   = ttr      unless ttr.nil?
        tube.put job.to_json, opts
      end

      def reserve_without_controller(timeout=nil)
        tube.reserve timeout
      end

      def reserve(timeout=nil)
        Controller::Beanstalk.new reserve_without_controller(timeout), self
      end

      def peek(state)
        tube.peek state
      end

      # For testing purposes... I think there's a better way to do this though.
      def drain!
        while peek(:ready) do
          reserve_without_controller.delete
        end
        while peek(:delayed) do
          reserve_without_controller.delete
        end
        while peek(:buried) do
          tube.kick
          reserve_without_controller.delete
        end
      end
      
      def self.configure(opts={})
        opts[:host] ||= ['127.0.0.1:11300']
        new(opts[:host], opts[:tube])
      end

      private
      def pool
        @pool ||= Beaneater::Pool.new Array(@host)
      end

      def tube
        @tube ||= pool.tubes[@tube_name]
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
quebert-2.0.4 lib/quebert/backend/beanstalk.rb
quebert-2.0.3 lib/quebert/backend/beanstalk.rb