Sha256: cb31bd077eb893a19836d62560f10cd4c11ac94f7ac07a16e271f8169a80f685

Contents?: true

Size: 728 Bytes

Versions: 1

Compression:

Stored size: 728 Bytes

Contents

require 'pika_que/util'

module PikaQue
  class Publisher

    def initialize(opts = {})
      @opts = PikaQue.config.merge(opts) 
      @codec = PikaQue::Util.constantize(@opts[:codec])
      @connection = @opts[:connection] || PikaQue.connection
      @channel = @connection.create_channel
      @exchange = @channel.exchange(@opts[:exchange], @opts[:exchange_options])
    end

    def publish(msg, options = {})
      to_queue = options.delete(:to_queue)
      options[:routing_key] ||= to_queue
      options[:content_type] ||= @codec.content_type
      msg = @codec.encode(msg)
      
      PikaQue.logger.info {"publishing <#{msg}> to [#{options[:routing_key]}]"}
      @exchange.publish(msg, options)
    end

  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
pika_que-0.1.5 lib/pika_que/publisher.rb