Sha256: aff07824a7c91bc99be34b8072923abf7edf0ec58454f05598f186a377e88226

Contents?: true

Size: 1.64 KB

Versions: 1

Compression:

Stored size: 1.64 KB

Contents

module AMQP
  class Queue
    attr_reader :name, :server
    attr_accessor :delivery_tag

    def initialize(server, name, opts = {})
      @server = server
      @opts   = opts
      @name   = name
      server.send_frame(
        Protocol::Queue::Declare.new({ :queue => name, :nowait => true }.merge(opts))
      )
    end

    def delete(opts = {})
      server.send_frame(
        Protocol::Queue::Delete.new({ :queue => name, :nowait => true }.merge(opts))
      )
      pp server.next_method
    end

    def pop(opts = {})
      self.delivery_tag = nil
      server.send_frame(
        Protocol::Basic::Get.new({ :queue => name, :consumer_tag => name, :no_ack => !opts.delete(:ack), :nowait => true }.merge(opts))
      )
      method = server.next_method
      return if method.is_a?(Protocol::Basic::GetEmpty)

      self.delivery_tag = method.delivery_tag

      header = server.next_payload
      msg    = server.next_payload
      raise 'unexpected length' if msg.length < header.size

      msg
    end

    def ack
      server.send_frame(
        Protocol::Basic::Ack.new(:delivery_tag => delivery_tag)
      )
    end

    def publish(data, opts = {})
      exchange.publish(data, opts)
    end

    def message_count
      status.first
    end

    def consumer_count
      status.last
    end
    
    def status(opts = {}, &blk)
      server.send_frame(
        Protocol::Queue::Declare.new({ :queue => name, :passive => true }.merge(opts))
      )
      method = server.next_method
      [method.message_count, method.consumer_count]
    end

  private
    def exchange
      @exchange ||= Exchange.new(server, :direct, '', :key => name)
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
famoseagle-carrot-0.1.0 lib/amqp/queue.rb