Sha256: 0e01900b4f369106d560ecbe92cc411205061c2beffa047a02ac2e86e544198c

Contents?: true

Size: 782 Bytes

Versions: 3

Compression:

Stored size: 782 Bytes

Contents

require 'pika_que/util'

module PikaQue
  class Publisher

    def initialize(opts = {})
      @opts = PikaQue.config.merge(opts) 
      @codec = PikaQue::Util.constantize(@opts[:codec])
      @connection = @opts[:connection] || PikaQue.connection
      @channel = @connection.create_channel
      @exchange = @channel.exchange(@opts[:exchange], @opts[:exchange_options])
    end

    def publish(msg, options = {})
      to_queue = options.delete(:to_queue)
      options[:routing_key] ||= to_queue
      options[:content_type] ||= @codec.content_type
      msg = @codec.encode(msg)
      
      PikaQue.logger.info {"publishing <#{msg}> to [#{options[:routing_key]}]"}
      @exchange.publish(msg, options)
    end

    def exchange_name
      @opts[:exchange]
    end

  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
pika_que-0.3.0 lib/pika_que/publisher.rb
pika_que-0.2.0 lib/pika_que/publisher.rb
pika_que-0.1.6 lib/pika_que/publisher.rb