Sha256: 0f9d0fce38eb7e02cc5bce560f6b0a63dc8fa03ddd0a0a9ea11f74d4210d65c5
Contents?: true
Size: 1.79 KB
Versions: 1
Compression:
Stored size: 1.79 KB
Contents
require 'stomp' module RosettaQueue module Gateway class StompAdapter < BaseAdapter def ack(msg) @conn.ack(msg.headers["message-id"]) end def initialize(user, password, host, port) @conn = Stomp::Connection.open(user, password, host, port, true) end def disconnect(message_handler) unsubscribe(destination_for(message_handler)) @conn.disconnect end def receive(options) msg = @conn.receive ack(msg) unless options[:ack].nil? msg end def receive_once(destination, opts) subscribe(destination, opts) msg = receive(opts).body unsubscribe(destination) RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}") filter_receiving(msg) end def receive_with(message_handler) options = options_for(message_handler) destination = destination_for(message_handler) @conn.subscribe(destination, options) running do msg = receive(options).body Thread.current[:processing] = true RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}") message_handler.on_message(filter_receiving(msg)) Thread.current[:processing] = false end end def send_message(destination, message, options) RosettaQueue.logger.info("Publishing to #{destination} :: #{message}") @conn.send(destination, message, options) end def subscribe(destination, options) @conn.subscribe(destination, options) end def unsubscribe(destination) @conn.unsubscribe(destination) end private def running(&block) loop(&block) end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
bmabey-rosetta_queue-0.1.3 | lib/rosetta_queue/adapters/stomp.rb |