Sha256: f647c2e63c58abf642d0d1e3142e8cbc983b6e263aa633c4c7d4ce3ea0b3c4f3

Contents?: true

Size: 1.2 KB

Versions: 4

Compression:

Stored size: 1.2 KB

Contents

class MQ
  class Exchange
    include AMQP

    def initialize mq, type, name, opts = {}
      @mq = mq
      @type, @name = type, name
      @mq.exchanges[@name = name] ||= self
      @key = opts[:key]

      @mq.callback{
        @mq.send Protocol::Exchange::Declare.new({ :exchange => name,
                                                   :type => type,
                                                   :nowait => true }.merge(opts))
      } unless name == "amq.#{type}" or name == ''
    end
    attr_reader :name, :type, :key

    def publish data, opts = {}
      @mq.callback{
        @mq.send Protocol::Basic::Publish.new({ :exchange => name,
                                                :routing_key => opts.delete(:key) || @key }.merge(opts))
      
        data = data.to_s

        @mq.send Protocol::Header.new(Protocol::Basic,
                                          data.length, { :content_type => 'application/octet-stream',
                                                         :delivery_mode => (opts.delete(:persistent) ? 2 : 1),
                                                         :priority => 0 }.merge(opts))
        @mq.send Frame::Body.new(data)
      }
      self
    end
  end
end

Version data entries

4 entries across 4 versions & 2 rubygems

Version Path
tmm1-amqp-0.5.3 lib/mq/exchange.rb
tmm1-amqp-0.5.5 lib/mq/exchange.rb
amqp-0.5.3 lib/mq/exchange.rb
amqp-0.5.5 lib/mq/exchange.rb