Sha256: c1ae61e832ab97e237ffff7148fb7fc012df939adc4fc3377b5136173935d119

Contents?: true

Size: 1.43 KB

Versions: 4

Compression:

Stored size: 1.43 KB

Contents

class MQ
  class Queue
    include AMQP
    
    def initialize mq, name, opts = {}
      @mq = mq
      @mq.queues[@name = name] ||= self
      @mq.callback{
        @mq.send Protocol::Queue::Declare.new({ :queue => name,
                                                :nowait => true }.merge(opts))
      }
    end
    attr_reader :name

    def bind exchange, opts = {}
      @mq.callback{
        @mq.send Protocol::Queue::Bind.new({ :queue => name,
                                             :exchange => exchange.respond_to?(:name) ? exchange.name : exchange,
                                             :routing_key => opts.delete(:key),
                                             :nowait => true }.merge(opts))
      }
      self
    end
    
    def subscribe opts = {}, &blk
      @on_msg = blk
      @mq.callback{
        @mq.send Protocol::Basic::Consume.new({ :queue => name,
                                                :consumer_tag => name,
                                                :no_ack => true,
                                                :nowait => true }.merge(opts))
      }
      self
    end

    def publish data, opts = {}
      exchange.publish(data, opts)
    end

    def receive headers, body
      if @on_msg
        @on_msg.call *(@on_msg.arity == 1 ? [body] : [headers, body])
      end
    end
  
    private
    
    def exchange
      @exchange ||= Exchange.new(@mq, :direct, '', :key => name)
    end
  end
end

Version data entries

4 entries across 4 versions & 2 rubygems

Version Path
tmm1-amqp-0.5.3 lib/mq/queue.rb
tmm1-amqp-0.5.5 lib/mq/queue.rb
amqp-0.5.3 lib/mq/queue.rb
amqp-0.5.5 lib/mq/queue.rb