Sha256: d58dd636906fecf5fc8dbb1a29b4fd127dd588c7fcf2f033652099393cb57715

Contents?: true

Size: 1.17 KB

Versions: 4

Compression:

Stored size: 1.17 KB

Contents

class MQ
  class Exchange
    include AMQP

    def initialize mq, type, name, opts = {}
      @mq = mq
      @type, @name = type, name
      @key = opts[:key]

      @mq.callback{
        @mq.send Protocol::Exchange::Declare.new({ :exchange => name,
                                                   :type => type,
                                                   :nowait => true }.merge(opts))
      } unless name == "amq.#{type}" or name == ''
    end
    attr_reader :name, :type, :key

    def publish data, opts = {}
      @mq.callback{
        EM.next_tick do
          @mq.send Protocol::Basic::Publish.new({ :exchange => name,
                                                  :routing_key => opts.delete(:key) || @key }.merge(opts))
        
          data = data.to_s

          @mq.send Protocol::Header.new(Protocol::Basic,
                                            data.length, { :content_type => 'application/octet-stream',
                                                           :delivery_mode => 1,
                                                           :priority => 0 }.merge(opts))
          @mq.send Frame::Body.new(data)
        end
      }
      self
    end
  end
end

Version data entries

4 entries across 4 versions & 2 rubygems

Version Path
tmm1-amqp-0.5.1 lib/mq/exchange.rb
tmm1-amqp-0.5.2 lib/mq/exchange.rb
amqp-0.5.1 lib/mq/exchange.rb
amqp-0.5.2 lib/mq/exchange.rb