Sha256: ec5dd588ad5d0d88a6b3d6f80a125396f8772785229235ca62be787c05b0c37d

Contents?: true

Size: 1.21 KB

Versions: 1

Compression:

Stored size: 1.21 KB

Contents

class MQ
  class Exchange
    include AMQP

    def initialize mq, type, name, opts = {}
      @mq = mq
      @type, @name = type, name
      @mq.exchanges[@name = name] ||= self
      @key = opts[:key]

      @mq.callback{
        @mq.send Protocol::Exchange::Declare.new({ :exchange => name,
                                                   :type => type,
                                                   :nowait => true }.merge(opts))
      } unless name == "amq.#{type}" or name == ''
    end
    attr_reader :name, :type, :key

    def publish data, opts = {}
      @mq.callback{
        out = []

        out << Protocol::Basic::Publish.new({ :exchange => name,
                                              :routing_key => opts.delete(:key) || @key }.merge(opts))
      
        data = data.to_s

        out << Protocol::Header.new(Protocol::Basic,
                                    data.length, { :content_type => 'application/octet-stream',
                                                   :delivery_mode => (opts.delete(:persistent) ? 2 : 1),
                                                   :priority => 0 }.merge(opts))

        out << Frame::Body.new(data)

        @mq.send *out
      }
      self
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
amqp-0.5.9 lib/mq/exchange.rb