Sha256: 0f3dbb8c8fb75a392cb6fbf3c301a46000448df4d133ea5d1ec0f78617edff8e

Contents?: true

Size: 1.4 KB

Versions: 2

Compression:

Stored size: 1.4 KB

Contents

module RosettaQueue
  module Gateway

    module Fanout
      def fanout_name_for(destination)
        fanout_name = destination.gsub(/(topic|fanout)\/(.*)/, '\2')
        raise AdapterException, "Unable to discover fanout exchange. Cannot bind queue to exchange!" unless fanout_name
        fanout_name
      end
    end

    class Amqp < BaseAdapter

      def initialize(adapter_settings = {})
        raise AdapterException, "Missing adapter settings" if adapter_settings.empty?
        @adapter_settings = adapter_settings
      end

      def delete(destination, opts={})
        exchange_strategy_for(destination, opts).delete(destination)
      end

      def disconnect(message_handler)
        destination = destination_for(message_handler)
        exchange_strategy_for(destination).unsubscribe
      end

      def receive_once(destination, opts={})
        exchange_strategy_for(destination, opts).receive_once(destination) do |msg|
          return msg
        end
      end

      def receive_with(message_handler)
        options = options_for(message_handler)
        destination = destination_for(message_handler)
        exchange_strategy_for(destination, options).receive(destination, message_handler)
      end

      def send_message(destination, message, options=nil)
        exchange_strategy_for(destination, options).publish(destination, message)
      end

      def unsubscribe; end

    end
  end
end

Version data entries

2 entries across 2 versions & 2 rubygems

Version Path
bmabey-rosetta_queue-0.3.3 lib/rosetta_queue/adapters/amqp.rb
rosetta_queue-0.4.0 lib/rosetta_queue/adapters/amqp.rb