lib/hornetq/client/requestor_pattern.rb in jruby-hornetq-0.3.3 vs lib/hornetq/client/requestor_pattern.rb in jruby-hornetq-0.4.0
- old
+ new
@@ -1,31 +1,112 @@
module HornetQ::Client
# Implements the Requestor Pattern
- # Send a request to a server and wait for a reply
+ # Send a request to a server and wait for a reply
+ # Parameters
+ # * session
+ # The session to use processing this request
+ # Note: Sessions cannot be shared concurrently by multiple threads
+ # * request_address
+ # Address to send requests to.
+ # It is expected that process listening to requests at this address has
+ # implemented the ServerPattern
+ # * reply_address
+ # If supplied the reply_address must already exist and will be used for
+ # receiving responses
+ # If not supplied a temporary queue will be created and used by this instance
+ # of the RequestorPattern
+ # This optional parameter is normally not used
+ # * reply_queue
+ # If a reply_address is supplied, the reply_queue name can be supplied if it
+ # differs from reply_address
class RequestorPattern
- def initialize(session, request_address)
+ def initialize(session, request_address, reply_address=nil, reply_queue=nil)
@session = session
@producer = session.create_producer(request_address)
- reply_queue = "#{request_address}.#{Java::java.util::UUID.randomUUID.toString}"
- begin
- session.create_temporary_queue(reply_queue, reply_queue)
- rescue NativeException => exc
- p exc
+ if reply_address
+ @reply_address = reply_address
+ @reply_queue = reply_queue || reply_address
+ @destroy_temp_queue = false
+ else
+ @reply_queue = @reply_address = "#{request_address}.#{Java::java.util::UUID.randomUUID.toString}"
+ begin
+ session.create_temporary_queue(@reply_address, @reply_queue)
+ @destroy_temp_queue = true
+ rescue NativeException => exc
+ p exc
+ end
end
- @consumer = session.create_consumer(reply_queue)
end
-
+
+ # Synchronous Request and wait for reply
+ #
+ # Returns the message received, or nil if no message was received in the
+ # specified timeout.
+ #
+ # The supplied request_message is updated as follows
+ # * The property JMSReplyTo is set to the name of the reply to address
+ # * Creates and sets the message user_id if not already set
+ # * #TODO: The expiry is set to the message timeout if not already set
+ #
+ # Note:
+ # * The request will only look for a reply message with the same
+ # user_id (message id) as the message that was sent. This is critical
+ # since a previous receive may have timed out and we do not want
+ # to pickup the reponse to an earlier request
+ #
+ # To receive a message after a timeout, call wait_for_reply with a nil message
+ # id to receive any message on the queue
+ #
+ # Use: submit_request & then wait_for_reply to break it into
+ # two separate calls
def request(request_message, timeout)
- request_message.putStringProperty(Java::OrgHornetqCoreClientImpl::ClientMessageImpl::REPLYTO_HEADER_NAME, @consumer.queue_name);
+ #TODO set message expiry to timeout if not already set
+ message_id = submit_request(request_message)
+ wait_for_reply(message_id, timeout)
+ end
+
+ # Asynchronous Request
+ # Use: submit_request & then wait_for_reply to break the request into
+ # two separate calls.
+ #
+ # For example, submit the request now, do some work, then later on
+ # in the same thread wait for the reply.
+ #
+ # The supplied request_message is updated as follows
+ # * The property JMSReplyTo is set to the name of the reply to address
+ # * Creates and sets the message user_id if not already set
+ # * #TODO: The expiry is set to the message timeout if not already set
+ #
+ # Returns Message id of the message that was sent
+ def submit_request(request_message)
+ request_message.reply_to_address = @reply_address
+ request_message.generate_user_id unless request_message.user_id
@producer.send(request_message)
- @consumer.receive(timeout)
+ request_message.user_id
end
+ # Asynchronous wait for reply
+ #
+ # Parameters:
+ # user_id: the user defined id to correlate a response for
+ #
+ # Supply a nil user_id to receive any message from the queue
+ #
+ # Returns the message received
+ #
+ # Note: Call submit_request before calling this method
+ def wait_for_reply(user_id, timeout)
+ # We only want the reply to the supplied message_id, so set filter on message id
+ filter = "#{Java::org.hornetq.api.core::FilterConstants::HORNETQ_USERID} = 'ID:#{user_id}'" if user_id
+ @session.consumer(:queue_name => @reply_queue, :filter=>filter) do |consumer|
+ consumer.receive(timeout)
+ end
+ end
+
def close
+ @session.delete_queue(@reply_queue) if @destroy_temp_queue
@producer.close if @producer
- @consumer.close if @consumer
- @session.delete_queue(@consumer.queue_name)
end
end
end
\ No newline at end of file