Sha256: 15a57283eeda8bfc860f3bed8d6df14f43dae385969f4c0f751116cef73cec72

Contents?: true

Size: 1.22 KB

Versions: 5

Compression:

Stored size: 1.22 KB

Contents

# -*- encoding : utf-8 -*-
require 'amqp'

module Untied
  module Publisher
    module AMQP
      class Producer < BaseProducer
        # Encapsulates both the Channel and Exchange (AMQP).

        def initialize(opts={})
          super
          check_em_reactor
          if ::AMQP.channel || opts[:channel]
            say "Using defined AMQP.channel"
            @channel = ::AMQP.channel || opts[:channel]
          end
        end

        # Publish the given event.
        #   event: object which is going to be serialized and sent through the
        #   wire. It should respond to #to_json.
        def safe_publish(e)
          on_exchange do |exchange|
            exchange.publish(e.to_json, :routing_key => @routing_key)
          end
        end

        # Creates a new exchange and yields it to the block passed when it's ready
        def on_exchange(&block)
          return unless @channel
          @channel.topic('untied', :auto_delete => true, &block)
        end

        def check_em_reactor
          if !defined?(EventMachine) || !EM.reactor_running?
            raise "In order to use the producer you must be running inside an " + \
              "eventmachine loop"
          end
        end
      end
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
untied-publisher-0.0.7.pre3 lib/untied-publisher/amqp/producer.rb
untied-publisher-0.0.7.pre2 lib/untied-publisher/amqp/producer.rb
untied-publisher-0.0.7.pre1 lib/untied-publisher/amqp/producer.rb
untied-publisher-0.0.7.pre lib/untied-publisher/amqp/producer.rb
untied-publisher-0.0.6 lib/untied-publisher/amqp/producer.rb