Sha256: 4cb5629a236fe343fa2b6cd139517e8b0693221855c4c5faa6f1eae8e92d9725

Contents?: true

Size: 866 Bytes

Versions: 1

Compression:

Stored size: 866 Bytes

Contents

require 'beanstalk-client'

class JobQueue::BeanstalkAdapter
  def initialize(options = {})
    @hosts = options[:hosts] || 'localhost:11300'
  end
  
  def put(string, queue, priority)
    beanstalk_pool(queue).put(string)
  end
  
  def subscribe(error_report, queue, &block)
    pool = Beanstalk::Pool.new([@hosts].flatten, queue)
    loop do
      begin
        job = pool.reserve(1)
        JobQueue.logger.info "Beanstalk received #{job.body}"
        begin
          yield job.body
        rescue => e
          error_report.call(job.body, e)
          job.delete
        end
      rescue Beanstalk::TimedOut
        # Do nothing - retry to reseve (from another host?)
      end
    end
  end
  
  def beanstalk_pool(queue)
    @beanstalk_pools ||= {}
    @beanstalk_pools[queue] ||= begin
      Beanstalk::Pool.new([@hosts].flatten, queue)
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
mloughran-job_queue-0.0.5 lib/job_queue/adapters/beanstalk_adapter.rb