lib/hornetq/client/factory.rb in jruby-hornetq-0.2.3.alpha vs lib/hornetq/client/factory.rb in jruby-hornetq-0.2.5.alpha

- old
+ new

@@ -1,378 +1,443 @@ -require 'uri' +module HornetQ + module Client -module HornetQ::Client + class Factory + # Create a new Factory and Session + # + # Creates a new factory and session, then passes the session to the supplied + # block. Upon completion the session and factory are both closed + # See Factory::initialize and Factory::create_session for the list + # of parameters + def self.session(params={},&proc) + raise "Missing mandatory code block" unless proc + factory = nil + session = nil + begin + if params.kind_of?(String) + # TODO: Support passing username and password from URI to Session + factory = self.new(params) + session = factory.session({}, &proc) + else + factory = self.new(params[:connector] || {}) + session = factory.session(params[:session] || {}, &proc) + end + ensure + session.close if session + factory.close if factory + end + end - class Factory - # Create a new Factory from which sessions can be created - # - # Parameters: - # * a Hash consisting of one or more of the named parameters - # * Summary of parameters and their default values - # HornetQ::Client::Factory.new( - # :uri => 'hornetq://localhost', - # :ack_batch_size => , - # :auto_group => , - # :block_on_acknowledge => , - # :block_on_durable_send => , - # :block_on_non_durable_send => , - # :cache_large_messages_client => , - # :call_timeout => , - # :client_failure_check_period => , - # :confirmation_window_size => , - # :connection_load_balancing_policy_class_name => , - # :connection_ttl => , - # :consumer_max_rate => , - # :consumer_window_size => , - # :discovery_address => , - # :discovery_initial_wait_timeout => , - # :discovery_port => , - # :discovery_refresh_timeout => , - # :failover_on_initial_connection => true, - # :failover_on_server_shutdown => true, - # :group_id => , - # :initial_message_packet_size => , - # :java_object => , - # :local_bind_address => , - # :max_retry_interval => , - # :min_large_message_size => , - # :pre_acknowledge => , - # :producer_max_rate => , - # :producer_window_size => , - # :reconnect_attempts => 1, - # :retry_interval => , - # :retry_interval_multiplier => , - # :scheduled_thread_pool_max_size => , - # :static_connectors => , - # :thread_pool_max_size => , - # :use_global_pools => - # ) - # - # Mandatory Parameters - # * :uri - # * The hornetq uri as to which server to connect with and which - # transport protocol to use. Format: - # hornetq://server:port,backupserver:port/?protocol=[netty|discover] - # * To use the default netty transport - # hornetq://server:port - # * To use the default netty transport and specify a backup server - # hornetq://server:port,backupserver:port - # * To use auto-discovery - # hornetq://server:port/?protocol=discovery - # * To use HornetQ within the current JVM - # hornetq://invm - # - # Optional Parameters - # * :ack_batch_size - # * :auto_group - # * :block_on_acknowledge - # * :block_on_durable_send - # * :block_on_non_durable_send - # * :cache_large_messages_client - # * :call_timeout - # * :client_failure_check_period - # * :confirmation_window_size - # * :connection_load_balancing_policy_class_name - # * :connection_ttl - # * :consumer_max_rate - # * :consumer_window_size - # * :discovery_address - # * :discovery_initial_wait_timeout - # * :discovery_port - # * :discovery_refresh_timeout - # * :failover_on_initial_connection - # * :failover_on_server_shutdown - # * :group_id - # * :initial_message_packet_size - # * :java_object - # * :local_bind_address - # * :max_retry_interval - # * :min_large_message_size - # * :pre_acknowledge - # * :producer_max_rate - # * :producer_window_size - # * :reconnect_attempts - # * :retry_interval - # * :retry_interval_multiplier - # * :scheduled_thread_pool_max_size - # * :static_connectors - # * :thread_pool_max_size - # * :use_global_pools - - def initialize(params={}) - HornetQ::Client.load_requirements - uri = nil - # TODO: Support :uri as an array for cluster configurations - if params.kind_of?(String) - uri = HornetQ::URI.new(params) - params = uri.params - else - raise "Missing :uri param in HornetQ::Server.create_server" unless params[:uri] - uri = HornetQ::URI.new(params.delete(:uri)) - # params override uri params - params = uri.params.merge(params) + # Create a new Factory along with a Session, and then start the session + # + # Creates a new factory and session, then passes the session to the supplied + # block. Upon completion the session and factory are both closed + # See Factory::initialize and Factory::create_session for the list + # of parameters + def self.start(params={},&proc) + session(params) do |session| + session.start + proc.call(session) + end end - - @factory = nil - # In-VM Transport has no fail-over or additional parameters - if uri.host == 'invm' - transport = Java::org.hornetq.api.core.TransportConfiguration.new(HornetQ::INVM_CONNECTOR_CLASS_NAME) - @factory = Java::org.hornetq.api.core.client.HornetQClient.create_client_session_factory(transport) - elsif params[:protocol] - # Auto-Discovery just has a host name and port - if params[:protocol] == 'discovery' - @factory = Java::org.hornetq.api.core.client.HornetQClient.create_client_session_factory(uri.host, uri.port) - elsif params[:protocol] != 'netty' - raise "Unknown HornetQ protocol:#{params[:protocol]}" + + # Call the supplied code block after creating a factory instance + # See initialize for the parameter list + # The factory is closed before returning + # + # Returns the result of the code block + def self.create_factory(params={}, &proc) + raise "Missing mandatory code block" unless proc + factory = nil + result = nil + begin + factory=self.new(params) + result = proc.call(factory) + ensure + factory.close end + result end - # Unless already created, then the factory will use the netty protocol - unless @factory - # Primary Transport - transport = Java::org.hornetq.api.core.TransportConfiguration.new(HornetQ::NETTY_CONNECTOR_CLASS_NAME, {'host' => uri.host, 'port' => uri.port }) + # Create a new Factory from which sessions can be created + # + # Parameters: + # * a Hash consisting of one or more of the named parameters + # * Summary of parameters and their default values + # HornetQ::Client::Factory.new( + # :uri => 'hornetq://localhost', + # :ack_batch_size => , + # :auto_group => , + # :block_on_acknowledge => , + # :block_on_durable_send => , + # :block_on_non_durable_send => , + # :cache_large_messages_client => , + # :call_timeout => , + # :client_failure_check_period => , + # :confirmation_window_size => , + # :connection_load_balancing_policy_class_name => , + # :connection_ttl => , + # :consumer_max_rate => , + # :consumer_window_size => , + # :discovery_address => , + # :discovery_initial_wait_timeout => , + # :discovery_port => , + # :discovery_refresh_timeout => , + # :failover_on_initial_connection => true, + # :failover_on_server_shutdown => true, + # :group_id => , + # :initial_message_packet_size => , + # :java_object => , + # :local_bind_address => , + # :max_retry_interval => , + # :min_large_message_size => , + # :pre_acknowledge => , + # :producer_max_rate => , + # :producer_window_size => , + # :reconnect_attempts => 1, + # :retry_interval => , + # :retry_interval_multiplier => , + # :scheduled_thread_pool_max_size => , + # :static_connectors => , + # :thread_pool_max_size => , + # :use_global_pools => + # ) + # + # Mandatory Parameters + # * :uri + # * The hornetq uri as to which server to connect with and which + # transport protocol to use. Format: + # hornetq://server:port,backupserver:port/?protocol=[netty|discover] + # * To use the default netty transport + # hornetq://server:port + # * To use the default netty transport and specify a backup server + # hornetq://server:port,backupserver:port + # * To use auto-discovery + # hornetq://server:port/?protocol=discovery + # * To use HornetQ within the current JVM + # hornetq://invm + # + # Optional Parameters + # * :ack_batch_size + # * :auto_group + # * :block_on_acknowledge + # * :block_on_durable_send + # * :block_on_non_durable_send + # * :cache_large_messages_client + # * :call_timeout + # * :client_failure_check_period + # * :confirmation_window_size + # * :connection_load_balancing_policy_class_name + # * :connection_ttl + # * :consumer_max_rate + # * :consumer_window_size + # * :discovery_address + # * :discovery_initial_wait_timeout + # * :discovery_port + # * :discovery_refresh_timeout + # * :failover_on_initial_connection + # * :failover_on_server_shutdown + # * :group_id + # * :initial_message_packet_size + # * :java_object + # * :local_bind_address + # * :max_retry_interval + # * :min_large_message_size + # * :pre_acknowledge + # * :producer_max_rate + # * :producer_window_size + # * :reconnect_attempts + # * :retry_interval + # * :retry_interval_multiplier + # * :scheduled_thread_pool_max_size + # * :static_connectors + # * :thread_pool_max_size + # * :use_global_pools - # Check for backup server connection information - if uri.backup_host - backup_transport = Java::org.hornetq.api.core.TransportConfiguration.new(HornetQ::NETTY_CONNECTOR_CLASS_NAME, {'host' => uri.backup_host, 'port' => uri.backup_port }) - @factory = Java::org.hornetq.api.core.client.HornetQClient.create_client_session_factory(transport, backup_transport) + def initialize(params={}) + uri = nil + # TODO: Support :uri as an array for cluster configurations + if params.kind_of?(String) + uri = HornetQ::URI.new(params) + params = uri.params else + raise "Missing :uri param in HornetQ::Server.create_server" unless params[:uri] + uri = HornetQ::URI.new(params.delete(:uri)) + # params override uri params + params = uri.params.merge(params) + end + + @factory = nil + # In-VM Transport has no fail-over or additional parameters + if uri.host == 'invm' + transport = Java::org.hornetq.api.core.TransportConfiguration.new(HornetQ::INVM_CONNECTOR_CLASS_NAME) @factory = Java::org.hornetq.api.core.client.HornetQClient.create_client_session_factory(transport) + elsif params[:protocol] + # Auto-Discovery just has a host name and port + if params[:protocol] == 'discovery' + @factory = Java::org.hornetq.api.core.client.HornetQClient.create_client_session_factory(uri.host, uri.port) + elsif params[:protocol] != 'netty' + raise "Unknown HornetQ protocol:#{params[:protocol]}" + end end + + # Unless already created, then the factory will use the netty protocol + unless @factory + # Primary Transport + transport = Java::org.hornetq.api.core.TransportConfiguration.new(HornetQ::NETTY_CONNECTOR_CLASS_NAME, {'host' => uri.host, 'port' => uri.port }) + + # Check for backup server connection information + if uri.backup_host + backup_transport = Java::org.hornetq.api.core.TransportConfiguration.new(HornetQ::NETTY_CONNECTOR_CLASS_NAME, {'host' => uri.backup_host, 'port' => uri.backup_port }) + @factory = Java::org.hornetq.api.core.client.HornetQClient.create_client_session_factory(transport, backup_transport) + else + @factory = Java::org.hornetq.api.core.client.HornetQClient.create_client_session_factory(transport) + end + end + + # If any other options were supplied, apply them to the created Factory instance + params.each_pair do |key, val| + next if key == :uri + method = key.to_s+'=' + if @factory.respond_to? method + @factory.send method, val + #puts "Debug: #{key} = #{@factory.send key}" if @factory.respond_to? key.to_sym + else + puts "Warning: Option:#{key}, with value:#{val} is invalid and being ignored" + end + end end - # If any other options were supplied, apply them to the created Factory instance - params.each_pair do |key, val| - next if key == :uri - method = key.to_s+'=' - if @factory.respond_to? method - @factory.send method, val - #puts "Debug: #{key} = #{@factory.send key}" if @factory.respond_to? key.to_sym - else - puts "Warning: Option:#{key}, with value:#{val} is invalid and being ignored" + # Create a new HornetQ session + # + # Note: Remember to close the session once it is no longer used. + # Recommend using #session with a block over this method where possible + # + # Note: + # * The returned session MUST be closed once complete + # factory = HornetQ::Client::Factory.new(:uri => 'hornetq://localhost/') + # session = factory.create_session + # ... + # session.close + # factory.close + # + # Returns: + # * A new HornetQ ClientSession + # * See org.hornetq.api.core.client.ClientSession for documentation on returned object + # + # Throws: + # * NativeException + # * ... + # + # Example: + # require 'hornetq' + # + # factory = nil + # session = nil + # begin + # factory = HornetQ::Client::Factory.new(:uri => 'hornetq://localhost/') + # session = factory.create_session + # + # # Create a new queue + # session.create_queue('Example', 'Example', true) + # + # # Create a producer to send messages + # producer = session.create_producer('Example') + # + # # Create a Text Message + # message = session.create_message(HornetQ::Client::Message::TEXT_TYPE,true) + # message.body_buffer.write_string('Hello World') + # + # # Send the message + # producer.send(message) + # ensure + # session.close if session + # factory.close if factory + # end + # + # Parameters: + # * a Hash consisting of one or more of the named parameters + # * Summary of parameters and their default values + # factory.create_session( + # :username => 'my_username', # Default is no authentication + # :password => 'password', # Default is no authentication + # :xa => false, + # :auto_commit_sends => true, + # :auto_commit_acks => true, + # :pre_acknowledge => false, + # :ack_batch_size => 1 + # ) + # + # Mandatory Parameters + # * None + # + # Optional Parameters + # * :username + # * The user name. To create an authenticated session + # + # * :password + # * The user password. To create an authenticated session + # + # * :xa + # * Whether the session supports XA transaction semantics or not + # + # * :auto_commit_sends + # * true: automatically commit message sends + # * false: commit manually + # + # * :auto_commit_acks + # * true: automatically commit message acknowledgement + # * false: commit manually + # + # * :pre_acknowledge + # * true: to pre-acknowledge messages on the server + # * false: to let the client acknowledge the messages + # * Note: It is possible to pre-acknowledge messages on the server so that the + # client can avoid additional network trip to the server to acknowledge + # messages. While this increases performance, this does not guarantee + # delivery (as messages can be lost after being pre-acknowledged on the + # server). Use with caution if your application design permits it. + # + # * :ack_batch_size + # * the batch size of the acknowledgements + # + def create_session(params={}) + raise "HornetQ::Client::Factory Already Closed" unless @factory + @factory.create_session( + params[:username], + params[:password], + params[:xa] || false, + params[:auto_commit_sends].nil? ? true : params[:auto_commit_sends], + params[:auto_commit_acks].nil? ? true : params[:auto_commit_acks], + params[:pre_acknowledge] || false, + params[:ack_batch_size] || 1) + end + + # Create a session, call the supplied block and once it completes + # close the session. + # See session_create for the Parameters + # + # Returns the result of the block + # + # Example + # HornetQ::Client::Factory.create_session(:uri => 'hornetq://localhost/') do |session| + # session.create_queue("Address", "Queue") + # end + # + # Example: + # require 'hornetq' + # + # factory = nil + # begin + # factory = HornetQ::Client::Factory.new(:uri => 'hornetq://localhost/') + # factory.create_session do |session| + # + # # Create a new queue + # session.create_queue('Example', 'Example', true) + # + # # Create a producer to send messages + # producer = session.create_producer('Example') + # + # # Create a Text Message + # message = session.create_message(HornetQ::Client::Message::TEXT_TYPE,true) + # message.body = 'Hello World' + # + # # Send the message + # producer.send(message) + # end + # ensure + # factory.close if factory + # end + def session(params={}, &proc) + raise "HornetQ::Client::session mandatory block missing" unless proc + session = nil + begin + session = create_session(params) + proc.call(session) + ensure + session.close if session end end - end - # Create a new HornetQ session - # - # Note: Remember to close the session once it is no longer used. - # Recommend using #session with a block over this method where possible - # - # Note: - # * The returned session MUST be closed once complete - # factory = HornetQ::Client::Factory.new(:uri => 'hornetq://localhost/') - # session = factory.create_session - # ... - # session.close - # factory.close - # - # Returns: - # * A new HornetQ ClientSession - # * See org.hornetq.api.core.client.ClientSession for documentation on returned object - # - # Throws: - # * NativeException - # * ... - # - # Example: - # require 'hornetq' - # - # factory = nil - # session = nil - # begin - # factory = HornetQ::Client::Factory.new(:uri => 'hornetq://localhost/') - # session = factory.create_session - # - # # Create a new queue - # session.create_queue('Example', 'Example', true) - # - # # Create a producer to send messages - # producer = session.create_producer('Example') - # - # # Create a Text Message - # message = session.create_message(HornetQ::Client::Message::TEXT_TYPE,true) - # message.body_buffer.write_string('Hello World') - # - # # Send the message - # producer.send(message) - # ensure - # session.close if session - # factory.close if factory - # end - # - # Parameters: - # * a Hash consisting of one or more of the named parameters - # * Summary of parameters and their default values - # factory.create_session( - # :username => 'my_username', # Default is no authentication - # :password => 'password', # Default is no authentication - # :xa => false, - # :auto_commit_sends => true, - # :auto_commit_acks => true, - # :pre_acknowledge => false, - # :ack_batch_size => 1 - # ) - # - # Mandatory Parameters - # * None - # - # Optional Parameters - # * :username - # * The user name. To create an authenticated session - # - # * :password - # * The user password. To create an authenticated session - # - # * :xa - # * Whether the session supports XA transaction semantics or not - # - # * :auto_commit_sends - # * true: automatically commit message sends - # * false: commit manually - # - # * :auto_commit_acks - # * true: automatically commit message acknowledgement - # * false: commit manually - # - # * :pre_acknowledge - # * true: to pre-acknowledge messages on the server - # * false: to let the client acknowledge the messages - # * Note: It is possible to pre-acknowledge messages on the server so that the - # client can avoid additional network trip to the server to acknowledge - # messages. While this increases performance, this does not guarantee - # delivery (as messages can be lost after being pre-acknowledged on the - # server). Use with caution if your application design permits it. - # - # * :ack_batch_size - # * the batch size of the acknowledgements - # - def create_session(params={}) - raise "HornetQ::Client::Factory Already Closed" unless @factory - @factory.create_session( - params[:username], - params[:password], - params[:xa] || false, - params[:auto_commit_sends].nil? ? true : params[:auto_commit_sends], - params[:auto_commit_acks].nil? ? true : params[:auto_commit_acks], - params[:pre_acknowledge] || false, - params[:ack_batch_size] || 1) - end + # Create a Session pool + # TODO Raise an exception when gene_pool is not available + def create_session_pool(params={}) + require 'hornetq/client/session_pool' + SessionPool.new(self, params) + end - # Create a session, call the supplied block and once it completes - # close the session. - # See session_create for the Parameters - # - # Returns the result of the block - # - # Example - # HornetQ::Client::Factory.create_session(:uri => 'hornetq://localhost/') do |session| - # session.create_queue("Address", "Queue") - # end - # - # Example: - # require 'hornetq' - # - # factory = nil - # begin - # factory = HornetQ::Client::Factory.new(:uri => 'hornetq://localhost/') - # factory.create_session do |session| - # - # # Create a new queue - # session.create_queue('Example', 'Example', true) - # - # # Create a producer to send messages - # producer = session.create_producer('Example') - # - # # Create a Text Message - # message = session.create_message(HornetQ::Client::Message::TEXT_TYPE,true) - # message.body = 'Hello World' - # - # # Send the message - # producer.send(message) - # end - # ensure - # factory.close if factory - # end - def session(params={}, &proc) - raise "HornetQ::Client::session mandatory block missing" unless proc - session = nil - begin - session = create_session(params) - proc.call(session) - ensure - session.close if session + # Close Factory connections + def close + @factory.close if @factory + @factory = nil end - end - - # Create a Session pool - # TODO Raise an exception when gene_pool is not available - def create_session_pool(params={}) - require 'hornetq/client/session_pool' - SessionPool.new(self, params) - end - - # Close Factory connections - def close - @factory.close if @factory - @factory = nil - end - # Create a new Factory and Session - # - # Creates a new factory and session, then passes the session to the supplied - # block. Upon completion the session and factory are both closed - # See Factory::initialize and Factory::create_session for the list - # of parameters - def self.session(params={},&proc) - raise "Missing mandatory code block" unless proc - factory = nil - session = nil - begin - if params.kind_of?(String) - # TODO: Support passing username and password from URI to Session - factory = self.new(params) - session = factory.session({}, &proc) - else - factory = self.new(params[:connector] || {}) - session = factory.session(params[:session] || {}, &proc) + # Receive messages in a separate thread when they arrive + # Allows messages to be received in a separate thread. I.e. Asynchronously + # This method will return to the caller before messages are processed. + # It is then the callers responsibility to keep the program active so that messages + # can then be processed. + # + # Note: + # + # Session Parameters: + # :options => any of the javax.jms.Session constants + # Default: javax.jms.Session::AUTO_ACKNOWLEDGE + # + # :session_count : Number of sessions to create, each with their own consumer which + # in turn will call the supplied block. + # Note: The supplied block must be thread safe since it will be called + # by several threads at the same time. + # I.e. Don't change instance variables etc. without the + # necessary semaphores etc. + # Default: 1 + # + # Consumer Parameters: + # :queue_name => Name of the Queue to read messages from + # + # :selector => Filter which messages should be returned from the queue + # Default: All messages + # :no_local => Determine whether messages published by its own connection + # should be delivered to it + # Default: false + # + # :statistics Capture statistics on how many messages have been read + # true : This method will capture statistics on the number of messages received + # and the time it took to process them. + # The timer starts when each() is called and finishes when either the last message was received, + # or when Destination::statistics is called. In this case MessageConsumer::statistics + # can be called several times during processing without affecting the end time. + # Also, the start time and message count is not reset until MessageConsumer::each + # is called again with :statistics => true + # + # The statistics gathered are returned when :statistics => true and :async => false + # + # Usage: For transacted sessions (the default) the Proc supplied must return + # either true or false: + # true => The session is committed + # false => The session is rolled back + # Any Exception => The session is rolled back + # + # Notes: + # * Remember to call ::start on the factory otherwise the on_message will not + # start consuming any messages + # * Remember to call message.acknowledge before completing the block so that + # the message will be removed from the queue + # * If the block throws an exception, the + def on_message(parms, &proc) + consumer_count = parms[:session_count] || 1 + consumer_count.times do + session = self.create_session(parms) + consumer = session.create_consumer_from_params(parms) + consumer.on_message(parms, &proc) + @consumers << consumer + @sessions << session end - ensure - session.close if session - factory.close if factory end - end - - # Create a new Factory along with a Session, and then start the session - # - # Creates a new factory and session, then passes the session to the supplied - # block. Upon completion the session and factory are both closed - # See Factory::initialize and Factory::create_session for the list - # of parameters - def self.start(params={},&proc) - session(params) do |session| - session.start - proc.call(session) + + def on_message_statistics + @consumers.collect{|consumer| consumer.on_message_statistics} end - end - - # Call the supplied code block after creating a factory instance - # See initialize for the parameter list - # The factory is closed before returning - # - # Returns the result of the code block - def self.create_factory(params={}, &proc) - raise "Missing mandatory code block" unless proc - factory = nil - result = nil - begin - factory=self.new(params) - result = proc.call(factory) - ensure - factory.close - end - result - end - end - + end + end end \ No newline at end of file