lib/hornetq/client/requestor_pattern.rb in jruby-hornetq-0.3.3 vs lib/hornetq/client/requestor_pattern.rb in jruby-hornetq-0.4.0

- old
+ new

@@ -1,31 +1,112 @@ module HornetQ::Client # Implements the Requestor Pattern - # Send a request to a server and wait for a reply + # Send a request to a server and wait for a reply + # Parameters + # * session + # The session to use processing this request + # Note: Sessions cannot be shared concurrently by multiple threads + # * request_address + # Address to send requests to. + # It is expected that process listening to requests at this address has + # implemented the ServerPattern + # * reply_address + # If supplied the reply_address must already exist and will be used for + # receiving responses + # If not supplied a temporary queue will be created and used by this instance + # of the RequestorPattern + # This optional parameter is normally not used + # * reply_queue + # If a reply_address is supplied, the reply_queue name can be supplied if it + # differs from reply_address class RequestorPattern - def initialize(session, request_address) + def initialize(session, request_address, reply_address=nil, reply_queue=nil) @session = session @producer = session.create_producer(request_address) - reply_queue = "#{request_address}.#{Java::java.util::UUID.randomUUID.toString}" - begin - session.create_temporary_queue(reply_queue, reply_queue) - rescue NativeException => exc - p exc + if reply_address + @reply_address = reply_address + @reply_queue = reply_queue || reply_address + @destroy_temp_queue = false + else + @reply_queue = @reply_address = "#{request_address}.#{Java::java.util::UUID.randomUUID.toString}" + begin + session.create_temporary_queue(@reply_address, @reply_queue) + @destroy_temp_queue = true + rescue NativeException => exc + p exc + end end - @consumer = session.create_consumer(reply_queue) end - + + # Synchronous Request and wait for reply + # + # Returns the message received, or nil if no message was received in the + # specified timeout. + # + # The supplied request_message is updated as follows + # * The property JMSReplyTo is set to the name of the reply to address + # * Creates and sets the message user_id if not already set + # * #TODO: The expiry is set to the message timeout if not already set + # + # Note: + # * The request will only look for a reply message with the same + # user_id (message id) as the message that was sent. This is critical + # since a previous receive may have timed out and we do not want + # to pickup the reponse to an earlier request + # + # To receive a message after a timeout, call wait_for_reply with a nil message + # id to receive any message on the queue + # + # Use: submit_request & then wait_for_reply to break it into + # two separate calls def request(request_message, timeout) - request_message.putStringProperty(Java::OrgHornetqCoreClientImpl::ClientMessageImpl::REPLYTO_HEADER_NAME, @consumer.queue_name); + #TODO set message expiry to timeout if not already set + message_id = submit_request(request_message) + wait_for_reply(message_id, timeout) + end + + # Asynchronous Request + # Use: submit_request & then wait_for_reply to break the request into + # two separate calls. + # + # For example, submit the request now, do some work, then later on + # in the same thread wait for the reply. + # + # The supplied request_message is updated as follows + # * The property JMSReplyTo is set to the name of the reply to address + # * Creates and sets the message user_id if not already set + # * #TODO: The expiry is set to the message timeout if not already set + # + # Returns Message id of the message that was sent + def submit_request(request_message) + request_message.reply_to_address = @reply_address + request_message.generate_user_id unless request_message.user_id @producer.send(request_message) - @consumer.receive(timeout) + request_message.user_id end + # Asynchronous wait for reply + # + # Parameters: + # user_id: the user defined id to correlate a response for + # + # Supply a nil user_id to receive any message from the queue + # + # Returns the message received + # + # Note: Call submit_request before calling this method + def wait_for_reply(user_id, timeout) + # We only want the reply to the supplied message_id, so set filter on message id + filter = "#{Java::org.hornetq.api.core::FilterConstants::HORNETQ_USERID} = 'ID:#{user_id}'" if user_id + @session.consumer(:queue_name => @reply_queue, :filter=>filter) do |consumer| + consumer.receive(timeout) + end + end + def close + @session.delete_queue(@reply_queue) if @destroy_temp_queue @producer.close if @producer - @consumer.close if @consumer - @session.delete_queue(@consumer.queue_name) end end end \ No newline at end of file