lib/client/utils.rb in stomp-1.2.16 vs lib/client/utils.rb in stomp-1.3.0
- old
+ new
@@ -69,37 +69,26 @@
if headers[:id] == nil
headers[:id] = Digest::SHA1.hexdigest(destination)
end
end
- # Register a receipt listener.
- def register_receipt_listener(listener)
- id = -1
- @id_mutex.synchronize do
- id = @ids.to_s
- @ids = @ids.succ
+ # Parse a stomp URL.
+ def parse_hosts(url)
+ hosts = []
+ host_match = /stomp(\+ssl)?:\/\/#{URL_REPAT}/
+ url.scan(host_match).each do |match|
+ host = {}
+ host[:ssl] = match[0] == "+ssl" ? true : false
+ host[:login] = match[3] || ""
+ host[:passcode] = match[4] || ""
+ host[:host] = match[5]
+ host[:port] = match[6].to_i
+ hosts << host
end
- @receipt_listeners[id] = listener
- id
+ hosts
end
-# Parse a stomp URL.
-def parse_hosts(url)
- hosts = []
- host_match = /stomp(\+ssl)?:\/\/#{URL_REPAT}/
- url.scan(host_match).each do |match|
- host = {}
- host[:ssl] = match[0] == "+ssl" ? true : false
- host[:login] = match[3] || ""
- host[:passcode] = match[4] || ""
- host[:host] = match[5]
- host[:port] = match[6].to_i
- hosts << host
- end
- hosts
-end
-
# A very basic check of required arguments.
def check_arguments!()
first_host = @parameters && @parameters[:hosts] && @parameters[:hosts].first
raise ArgumentError if first_host.nil?
@@ -129,38 +118,57 @@
# For backward compatibility, some messages may already exist with no
# subscription id, in which case we can attempt to synthesize one.
set_subscription_id_if_missing(message.headers['destination'], message.headers)
subscription_id = message.headers[:id]
end
- @listeners[subscription_id]
+
+ listener = @listeners[subscription_id]
+ listener.call(message) if listener
end
- # Start a single listener thread. Misnamed I think.
- def start_listeners()
+ # Register a receipt listener.
+ def register_receipt_listener(listener)
+ id = uuid
+ @receipt_listeners[id] = listener
+ id
+ end
+
+ def find_receipt_listener(message)
+ listener = @receipt_listeners[message.headers['receipt-id']]
+ listener.call(message) if listener
+ end
+
+ def create_listener_maps
@listeners = {}
@receipt_listeners = {}
@replay_messages_by_txn = {}
+ @listener_map = Hash.new do |message|
+ @logger.on_miscerr(@connection.log_params, "Received unknown frame type: '#{message.command}'\n")
+ end
+
+ @listener_map[Stomp::CMD_MESSAGE] = lambda {|message| find_listener(message) }
+ @listener_map[Stomp::CMD_RECEIPT] = lambda {|message| find_receipt_listener(message) }
+ @listener_map[Stomp::CMD_ERROR] = @error_listener
+ end
+
+ # Start a single listener thread. Misnamed I think.
+ def start_listeners()
+ create_listener_maps
+
@listener_thread = Thread.start do
- while true
+ loop do
message = @connection.receive
# AMQ specific behavior
if message.nil? && (!@parameters[:reliable])
raise Stomp::Error::NilMessageError
end
- if message # message can be nil on rapid AMQ stop / start sequences
- # OK, we have some real data
- if message.command == Stomp::CMD_MESSAGE
- if listener = find_listener(message)
- listener.call(message)
- end
- elsif message.command == Stomp::CMD_RECEIPT
- if listener = @receipt_listeners[message.headers['receipt-id']]
- listener.call(message)
- end
- end
- end
- end # while true
+
+ next unless message # message can be nil on rapid AMQ stop/start sequences
+
+ @listener_map[message.command].call(message)
+ end
+
end
end # method start_listeners
end # class Client