Sha256: 7167fea0f75b0a864b84f41819021f67baa8794bbeb105616e7774f2bf21172f

Contents?: true

Size: 622 Bytes

Versions: 2

Compression:

Stored size: 622 Bytes

Contents

module Cloudist
  class JobQueue < Cloudist::Queues::BasicQueue
    attr_reader :prefetch

    def initialize(queue_name, opts={})
      @prefetch = opts.delete(:prefetch) || 1
      opts[:auto_delete] = false

      super(queue_name, opts)
    end

    def setup
      super
      @mq.prefetch(self.prefetch)
    end

    def subscribe(amqp_opts={}, opts={})
      amqp_opts[:ack] ||= true
      super(amqp_opts, opts) do |request|
        begin
          yield request if block_given?
        ensure
          request.ack unless amqp_opts[:auto_ack] == false || Cloudist.closing?
        end
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
cloudist-0.2.1 lib/cloudist/queues/job_queue.rb
cloudist-0.2.0 lib/cloudist/queues/job_queue.rb