Sha256: 60aa0a0e0b92bb905753c7dbf7c23f823ad49add53d8ed9155de79c5f6698754

Contents?: true

Size: 1.99 KB

Versions: 5

Compression:

Stored size: 1.99 KB

Contents

require 'stomp'

module RosettaQueue
  module Gateway
  
    class StompAdapter < BaseAdapter

      def ack(msg)
        @conn.ack(msg.headers["message-id"])
      end

      def initialize(adapter_settings = {})
        raise "Missing adapter settings" if adapter_settings.empty?
        @conn = Stomp::Connection.open(adapter_settings[:user], 
                                       adapter_settings[:password], 
                                       adapter_settings[:host], 
                                       adapter_settings[:port], 
                                       true)
      end

      def disconnect(message_handler)
        unsubscribe(destination_for(message_handler))
        @conn.disconnect
      end

      def receive(options)
        msg = @conn.receive
        ack(msg) unless options[:ack].nil?
        msg
      end
      
      def receive_once(destination, opts)
        subscribe(destination, opts)
        msg = receive(opts).body
        unsubscribe(destination)
        RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}")
        filter_receiving(msg)
      end

      def receive_with(message_handler)
        options = options_for(message_handler)
        destination = destination_for(message_handler)
        @conn.subscribe(destination, options)

        running do
          msg = receive(options).body
          RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}")
          message_handler.on_message(filter_receiving(msg))
        end
      end
      
      def send_message(destination, message, options)
        RosettaQueue.logger.info("Publishing to #{destination} :: #{message}")        
        @conn.send(destination, message, options)
      end

      def subscribe(destination, options)
        @conn.subscribe(destination, options)
      end
          
      def unsubscribe(destination)
        @conn.unsubscribe(destination)
      end
      
      private
      
        def running(&block)
          loop(&block)
        end

    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
cwyckoff-rosetta_queue-0.1.4 lib/rosetta_queue/adapters/stomp.rb
cwyckoff-rosetta_queue-0.2.0 lib/rosetta_queue/adapters/stomp.rb
cwyckoff-rosetta_queue-0.2.1 lib/rosetta_queue/adapters/stomp.rb
cwyckoff-rosetta_queue-0.2.2 lib/rosetta_queue/adapters/stomp.rb
cwyckoff-rosetta_queue-0.3.0 lib/rosetta_queue/adapters/stomp.rb