# -*- encoding: utf-8 -*- require 'thread' require 'digest/sha1' module Stomp # Typical Stomp client class. Uses a listener thread to receive frames # from the server, any thread can send. # # Receives all happen in one thread, so consider not doing much processing # in that thread if you have much message volume. class Client attr_reader :login, :passcode, :host, :port, :reliable, :parameters #alias :obj_send :send # A new Client object can be initialized using two forms: # # Standard positional parameters: # login (String, default : '') # passcode (String, default : '') # host (String, default : 'localhost') # port (Integer, default : 61613) # reliable (Boolean, default : false) # # e.g. c = Client.new('login', 'passcode', 'localhost', 61613, true) # # Stomp URL : # A Stomp URL must begin with 'stomp://' and can be in one of the following forms: # # stomp://host:port # stomp://host.domain.tld:port # stomp://login:passcode@host:port # stomp://login:passcode@host.domain.tld:port # def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) # Parse stomp:// URL's or set params if login.is_a?(Hash) @parameters = login first_host = @parameters[:hosts][0] @login = first_host[:login] @passcode = first_host[:passcode] @host = first_host[:host] @port = first_host[:port] || Connection::default_port(first_host[:ssl]) @reliable = true elsif login =~ /^stomp:\/\/#{url_regex}/ # e.g. stomp://login:passcode@host:port or stomp://host:port @login = $2 || "" @passcode = $3 || "" @host = $4 @port = $5.to_i @reliable = false elsif login =~ /^failover:(\/\/)?\(stomp(\+ssl)?:\/\/#{url_regex}(,stomp(\+ssl)?:\/\/#{url_regex}\))+(\?(.*))?$/ # e.g. failover://(stomp://login1:passcode1@localhost:61616,stomp://login2:passcode2@remotehost:61617)?option1=param first_host = {} first_host[:ssl] = !$2.nil? @login = first_host[:login] = $4 || "" @passcode = first_host[:passcode] = $5 || "" @host = first_host[:host] = $6 @port = first_host[:port] = $7.to_i || Connection::default_port(first_host[:ssl]) options = $16 || "" parts = options.split(/&|=/) options = Hash[*parts] hosts = [first_host] + parse_hosts(login) @parameters = {} @parameters[:hosts] = hosts @parameters.merge! filter_options(options) @reliable = true else @login = login @passcode = passcode @host = host @port = port.to_i @reliable = reliable end check_arguments! @id_mutex = Mutex.new @ids = 1 if @parameters @connection = Connection.new(@parameters) else @connection = Connection.new(@login, @passcode, @host, @port, @reliable) end start_listeners end # Syntactic sugar for 'Client.new' See 'initialize' for usage. def self.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) Client.new(login, passcode, host, port, reliable) end # Join the listener thread for this client, # generally used to wait for a quit signal def join(limit = nil) @listener_thread.join(limit) end # Begin a transaction by name def begin(name, headers = {}) @connection.begin(name, headers) end # Abort a transaction by name def abort(name, headers = {}) @connection.abort(name, headers) # lets replay any ack'd messages in this transaction replay_list = @replay_messages_by_txn[name] if replay_list replay_list.each do |message| if listener = find_listener(message) listener.call(message) end end end end # Commit a transaction by name def commit(name, headers = {}) txn_id = headers[:transaction] @replay_messages_by_txn.delete(txn_id) @connection.commit(name, headers) end # Subscribe to a destination, must be passed a block # which will be used as a callback listener # # Accepts a transaction header ( :transaction => 'some_transaction_id' ) def subscribe(destination, headers = {}) raise "No listener given" unless block_given? # use subscription id to correlate messages to subscription. As described in # the SUBSCRIPTION section of the protocol: http://stomp.github.com/. # If no subscription id is provided, generate one. set_subscription_id_if_missing(destination, headers) if @listeners[headers[:id]] raise "attempting to subscribe to a queue with a previous subscription" end @listeners[headers[:id]] = lambda {|msg| yield msg} @connection.subscribe(destination, headers) end # Unsubecribe from a channel def unsubscribe(name, headers = {}) set_subscription_id_if_missing(name, headers) @connection.unsubscribe(name, headers) @listeners[headers[:id]] = nil end # Acknowledge a message, used when a subscription has specified # client acknowledgement ( connection.subscribe "/queue/a", :ack => 'client'g # # Accepts a transaction header ( :transaction => 'some_transaction_id' ) def acknowledge(message, headers = {}) txn_id = headers[:transaction] if txn_id # lets keep around messages ack'd in this transaction in case we rollback replay_list = @replay_messages_by_txn[txn_id] if replay_list.nil? replay_list = [] @replay_messages_by_txn[txn_id] = replay_list end replay_list << message end if block_given? headers['receipt'] = register_receipt_listener lambda {|r| yield r} end @connection.ack message.headers['message-id'], headers end # Stomp 1.1+ NACK def nack(message_id, headers = {}) @connection.nack message_id, headers end # Unreceive a message, sending it back to its queue or to the DLQ # def unreceive(message, options = {}) @connection.unreceive(message, options) end # Publishes message to destination # # If a block is given a receipt will be requested and passed to the # block on receipt # # Accepts a transaction header ( :transaction => 'some_transaction_id' ) def publish(destination, message, headers = {}) if block_given? headers['receipt'] = register_receipt_listener lambda {|r| yield r} end @connection.publish(destination, message, headers) end def obj_send(*args) __send__(*args) end def connection_frame @connection.connection_frame end def disconnect_receipt @connection.disconnect_receipt end # Is this client open? def open? @connection.open? end # Is this client closed? def closed? @connection.closed? end # Close out resources in use by this client def close headers={} @listener_thread.exit @connection.disconnect headers end # Check if the thread was created and isn't dead def running @listener_thread && !!@listener_thread.status end # Convenience method def set_logger(logger) @connection.set_logger(logger) end # Convenience method def protocol() @connection.protocol end # Convenience method def valid_utf8?(s) @connection.valid_utf8?(s) end # Convenience method for clients def sha1(data) @connection.sha1(data) end # Convenience method for clients def uuid() @connection.uuid() end private # Set a subscription id in the headers hash if one does not already exist. # For simplicities sake, all subscriptions have a subscription ID. # setting an id in the SUBSCRIPTION header is described in the stomp protocol docs: # http://stomp.github.com/ def set_subscription_id_if_missing(destination, headers) headers[:id] = headers[:id] ? headers[:id] : headers['id'] if headers[:id] == nil headers[:id] = Digest::SHA1.hexdigest(destination) end end def register_receipt_listener(listener) id = -1 @id_mutex.synchronize do id = @ids.to_s @ids = @ids.succ end @receipt_listeners[id] = listener id end # e.g. login:passcode@host:port or host:port def url_regex '(([\w\.\-]*):(\w*)@)?([\w\.\-]+):(\d+)' end def parse_hosts(url) hosts = [] host_match = /stomp(\+ssl)?:\/\/(([\w\.]*):(\w*)@)?([\w\.]+):(\d+)\)/ url.scan(host_match).each do |match| host = {} host[:ssl] = !match[0].nil? host[:login] = match[2] || "" host[:passcode] = match[3] || "" host[:host] = match[4] host[:port] = match[5].to_i hosts << host end hosts end def check_arguments! raise ArgumentError if @host.nil? || @host.empty? raise ArgumentError if @port.nil? || @port == '' || @port < 1 || @port > 65535 raise ArgumentError unless @reliable.is_a?(TrueClass) || @reliable.is_a?(FalseClass) end def filter_options(options) new_options = {} new_options[:initial_reconnect_delay] = (options["initialReconnectDelay"] || 10).to_f / 1000 # In ms new_options[:max_reconnect_delay] = (options["maxReconnectDelay"] || 30000 ).to_f / 1000 # In ms new_options[:use_exponential_back_off] = !(options["useExponentialBackOff"] == "false") # Default: true new_options[:back_off_multiplier] = (options["backOffMultiplier"] || 2 ).to_i new_options[:max_reconnect_attempts] = (options["maxReconnectAttempts"] || 0 ).to_i new_options[:randomize] = options["randomize"] == "true" # Default: false new_options[:backup] = false # Not implemented yet: I'm using a master X slave solution new_options[:timeout] = -1 # Not implemented yet: a "timeout(5) do ... end" would do the trick, feel free new_options end def find_listener(message) subscription_id = message.headers['subscription'] if subscription_id == nil # For backward compatibility, some messages may already exist with no # subscription id, in which case we can attempt to synthesize one. set_subscription_id_if_missing(message.headers['destination'], message.headers) subscription_id = message.headers[:id] end @listeners[subscription_id] end def start_listeners @listeners = {} @receipt_listeners = {} @replay_messages_by_txn = {} @listener_thread = Thread.start do while true message = @connection.receive if message.command == Stomp::CMD_MESSAGE if listener = find_listener(message) listener.call(message) end elsif message.command == Stomp::CMD_RECEIPT if listener = @receipt_listeners[message.headers['receipt-id']] listener.call(message) end end end end end end end