Sha256: 8aa2d72ae0e56340733ecc56704901a0ac22765ddfcc571051350ca431aa0420

Contents?: true

Size: 580 Bytes

Versions: 1

Compression:

Stored size: 580 Bytes

Contents

module Cloudist
  class JobQueue < BasicQueue
    attr_reader :prefetch

    def initialize(queue_name, opts={})
      @prefetch = opts.delete(:prefetch) || 1
      opts[:auto_delete] = false

      super(queue_name, opts)
    end

    def setup
      super
      @mq.prefetch(self.prefetch)
    end

    def subscribe(amqp_opts={}, opts={})
      amqp_opts[:ack] = true
      super(amqp_opts, opts) do |request|
        begin
          yield request if block_given?
        ensure
          request.ack unless amqp_opts[:auto_ack] == false
        end
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
cloudist-0.0.2 lib/cloudist/job_queue.rb