lib/client/utils.rb in stomp-1.2.16 vs lib/client/utils.rb in stomp-1.3.0

- old
+ new

@@ -69,37 +69,26 @@ if headers[:id] == nil headers[:id] = Digest::SHA1.hexdigest(destination) end end - # Register a receipt listener. - def register_receipt_listener(listener) - id = -1 - @id_mutex.synchronize do - id = @ids.to_s - @ids = @ids.succ + # Parse a stomp URL. + def parse_hosts(url) + hosts = [] + host_match = /stomp(\+ssl)?:\/\/#{URL_REPAT}/ + url.scan(host_match).each do |match| + host = {} + host[:ssl] = match[0] == "+ssl" ? true : false + host[:login] = match[3] || "" + host[:passcode] = match[4] || "" + host[:host] = match[5] + host[:port] = match[6].to_i + hosts << host end - @receipt_listeners[id] = listener - id + hosts end -# Parse a stomp URL. -def parse_hosts(url) - hosts = [] - host_match = /stomp(\+ssl)?:\/\/#{URL_REPAT}/ - url.scan(host_match).each do |match| - host = {} - host[:ssl] = match[0] == "+ssl" ? true : false - host[:login] = match[3] || "" - host[:passcode] = match[4] || "" - host[:host] = match[5] - host[:port] = match[6].to_i - hosts << host - end - hosts -end - # A very basic check of required arguments. def check_arguments!() first_host = @parameters && @parameters[:hosts] && @parameters[:hosts].first raise ArgumentError if first_host.nil? @@ -129,38 +118,57 @@ # For backward compatibility, some messages may already exist with no # subscription id, in which case we can attempt to synthesize one. set_subscription_id_if_missing(message.headers['destination'], message.headers) subscription_id = message.headers[:id] end - @listeners[subscription_id] + + listener = @listeners[subscription_id] + listener.call(message) if listener end - # Start a single listener thread. Misnamed I think. - def start_listeners() + # Register a receipt listener. + def register_receipt_listener(listener) + id = uuid + @receipt_listeners[id] = listener + id + end + + def find_receipt_listener(message) + listener = @receipt_listeners[message.headers['receipt-id']] + listener.call(message) if listener + end + + def create_listener_maps @listeners = {} @receipt_listeners = {} @replay_messages_by_txn = {} + @listener_map = Hash.new do |message| + @logger.on_miscerr(@connection.log_params, "Received unknown frame type: '#{message.command}'\n") + end + + @listener_map[Stomp::CMD_MESSAGE] = lambda {|message| find_listener(message) } + @listener_map[Stomp::CMD_RECEIPT] = lambda {|message| find_receipt_listener(message) } + @listener_map[Stomp::CMD_ERROR] = @error_listener + end + + # Start a single listener thread. Misnamed I think. + def start_listeners() + create_listener_maps + @listener_thread = Thread.start do - while true + loop do message = @connection.receive # AMQ specific behavior if message.nil? && (!@parameters[:reliable]) raise Stomp::Error::NilMessageError end - if message # message can be nil on rapid AMQ stop / start sequences - # OK, we have some real data - if message.command == Stomp::CMD_MESSAGE - if listener = find_listener(message) - listener.call(message) - end - elsif message.command == Stomp::CMD_RECEIPT - if listener = @receipt_listeners[message.headers['receipt-id']] - listener.call(message) - end - end - end - end # while true + + next unless message # message can be nil on rapid AMQ stop/start sequences + + @listener_map[message.command].call(message) + end + end end # method start_listeners end # class Client