Sha256: fbac79a7024d9b04e68cf92b722dcc9e5051e785a644e128e828f98f8014d3a9

Contents?: true

Size: 1.37 KB

Versions: 2

Compression:

Stored size: 1.37 KB

Contents

require_relative 'request'
require 'json'

class Freddy
  class Producer
    OnReturnNotImplemented = Class.new(NoMethodError)

    CONTENT_TYPE = 'application/json'.freeze

    def initialize(channel, logger)
      @channel, @logger = channel, logger
      @exchange = @channel.default_exchange
      @topic_exchange = @channel.topic Freddy::FREDDY_TOPIC_EXCHANGE_NAME
    end

    def produce(destination, payload, properties={})
      @logger.debug "Producing message #{payload.inspect} to #{destination}"

      properties = properties.merge(routing_key: destination, content_type: CONTENT_TYPE)
      json_payload = payload.to_json

      @topic_exchange.publish json_payload, properties.dup
      @exchange.publish json_payload, properties.dup
    end

    def on_return(&block)
      if @exchange.respond_to? :on_return # Bunny
        @exchange.on_return do |return_info, properties, content|
          block.call(return_info[:reply_code], properties[:correlation_id])
        end
      elsif @channel.respond_to? :on_return # Hare
        @channel.on_return do |reply_code, _, exchange_name, _, properties|
          if exchange_name != Freddy::FREDDY_TOPIC_EXCHANGE_NAME
            block.call(reply_code, properties.correlation_id)
          end
        end
      else
        raise OnReturnNotImplemented.new "AMQP implementation doesn't implement on_return"
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 2 rubygems

Version Path
freddy-jruby-0.4.3 lib/freddy/producer.rb
freddy-0.4.3 lib/freddy/producer.rb