Sha256: b7e8a02057b4e86f21f9f218c9f513b476f1eae1e8dbd72aa526570effea444b

Contents?: true

Size: 583 Bytes

Versions: 4

Compression:

Stored size: 583 Bytes

Contents

module Cloudist
  class JobQueue < BasicQueue
    attr_reader :prefetch

    def initialize(queue_name, opts={})
      @prefetch = opts.delete(:prefetch) || 1
      opts[:auto_delete] = false

      super(queue_name, opts)
    end

    def setup
      super
      @mq.prefetch(self.prefetch)
    end

    def subscribe(amqp_opts={}, opts={})
      amqp_opts[:ack] ||= true
      super(amqp_opts, opts) do |request|
        begin
          yield request if block_given?
        ensure
          request.ack unless amqp_opts[:auto_ack] == false
        end
      end
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
cloudist-0.1.2 lib/cloudist/job_queue.rb
cloudist-0.1.1 lib/cloudist/job_queue.rb
cloudist-0.1.0 lib/cloudist/job_queue.rb
cloudist-0.0.3 lib/cloudist/job_queue.rb