lib/hornetq/client/connection.rb in jruby-hornetq-0.3.3 vs lib/hornetq/client/connection.rb in jruby-hornetq-0.4.0

- old
+ new

@@ -32,11 +32,11 @@ # # Creates a new connection and session, then passes the session to the supplied # block. Upon completion the session and connection are both closed # See Connection::initialize and Connection::create_session for the list # of parameters - # + # # Returns result of block def self.start_session(params={},&proc) session(params) do |session| session.start proc.call(session) @@ -61,54 +61,12 @@ result end # Create a new Connection from which sessions can be created # - # Parameters: - # * a Hash consisting of one or more of the named parameters - # * Summary of parameters and their default values - # HornetQ::Client::Connection.new( - # :uri => 'hornetq://localhost', - # :ack_batch_size => , - # :auto_group => , - # :block_on_acknowledge => , - # :block_on_durable_send => , - # :block_on_non_durable_send => , - # :cache_large_messages_client => , - # :call_timeout => , - # :client_failure_check_period => , - # :confirmation_window_size => , - # :connection_load_balancing_policy_class_name => , - # :connection_ttl => , - # :consumer_max_rate => , - # :consumer_window_size => , - # :discovery_address => , - # :discovery_initial_wait_timeout => , - # :discovery_port => , - # :discovery_refresh_timeout => , - # :failover_on_initial_connection => true, - # :failover_on_server_shutdown => true, - # :group_id => , - # :initial_message_packet_size => , - # :java_object => , - # :local_bind_address => , - # :max_retry_interval => , - # :min_large_message_size => , - # :pre_acknowledge => , - # :producer_max_rate => , - # :producer_window_size => , - # :reconnect_attempts => 1, - # :retry_interval => , - # :retry_interval_multiplier => , - # :scheduled_thread_pool_max_size => , - # :static_connectors => , - # :thread_pool_max_size => , - # :use_global_pools => - # ) - # # Mandatory Parameters - # * :uri + # * :uri => 'hornetq://localhost', # * The hornetq uri as to which server to connect with and which # transport protocol to use. Format: # hornetq://server:port,backupserver:port/?protocol=[netty|discover] # * To use the default netty transport # hornetq://server:port @@ -116,48 +74,208 @@ # hornetq://server:port,backupserver:port # * To use auto-discovery # hornetq://server:port/?protocol=discovery # * To use HornetQ within the current JVM # hornetq://invm - # # Optional Parameters - # * :ack_batch_size - # * :auto_group - # * :block_on_acknowledge - # * :block_on_durable_send - # * :block_on_non_durable_send - # * :cache_large_messages_client - # * :call_timeout - # * :client_failure_check_period - # * :confirmation_window_size - # * :connection_load_balancing_policy_class_name - # * :connection_ttl - # * :consumer_max_rate - # * :consumer_window_size - # * :discovery_address - # * :discovery_initial_wait_timeout - # * :discovery_port - # * :discovery_refresh_timeout - # * :failover_on_initial_connection - # * :failover_on_server_shutdown - # * :group_id - # * :initial_message_packet_size - # * :java_object - # * :local_bind_address - # * :max_retry_interval - # * :min_large_message_size - # * :pre_acknowledge - # * :producer_max_rate - # * :producer_window_size - # * :reconnect_attempts - # * :retry_interval - # * :retry_interval_multiplier - # * :scheduled_thread_pool_max_size - # * :static_connectors - # * :thread_pool_max_size - # * :use_global_pools - + # + # High Availability + # + # * :ha => true | false, + # true: Receives cluster topology updates from the cluster as + # servers leave or join and new backups are appointed or removed. + # false: Uses the suplied static list of hosts in :uri + # and no HA backup information is propagated to the client + # Default: false + # + # Flow Control + # + # :ack_batch_size => integer, + # Sets the acknowledgements batch size. Must be > 0 + # + # :pre_acknowledge => true | false, + # Sets whether messages will pre-acknowledged on the server before + # they are sent to the consumers or not + # true : Pre-acknowledge consumed messages on the server before they are sent to consumers + # false: Clients acknowledge the message they consume. + # Default: false + # + # Grouping: + # + # :auto_group => true | false, + # Sets whether producers will automatically assign a group ID + # to sent messages + # true: A random unique group ID is created and set on each message + # for the property Message.HDR_GROUP_ID + # Default: false + # + # :group_id => string, + # Sets the group ID that will be set on each message sent + # Default: nil (no goup id will be set) + # + # Blocking calls: + # + # :block_on_acknowledge => true | false, + # Sets whether consumers created through this factory will block + # while sending message acknowledgements or do it asynchronously. + # Default: false + # + # :block_on_durable_send => true | false, + # Sets whether producers will block while sending durable messages + # or do it asynchronously. + # If the session is configured to send durable message asynchronously, + # the client can set a SendAcknowledgementHandler on the ClientSession + # to be notified once the message has been handled by the server. + # Default: true + # + # :block_on_non_durable_send => true | false, + # Sets whether producers will block while sending non-durable messages + # or do it asynchronously. + # If the session is configured to send non-durable message asynchronously, + # the client can set a SendAcknowledgementHandler on the ClientSession + # to be notified once the message has been handled by the server. + # Default: false + # + # :call_timeout => long, + # Sets the blocking calls timeout in milliseconds. If client's blocking calls to the + # server take more than this timeout, the call will throw a + # HornetQException with the code HornetQException.CONNECTION_TIMEDOUT. + # Value is in milliseconds, default value is HornetQClient.DEFAULT_CALL_TIMEOUT. + # Must be >= 0 + # + # Client Reconnection Parameters: + # + # :connection_ttl => long, + # Set the connection time-to-live + # -1 : Disable + # >=0 : milliseconds the server will keep a connection alive in the + # absence of any data arriving from the client. + # Default: 60,000 + # + # :client_failure_check_period => long, + # Sets the period in milliseconds used to check if a client has + # failed to receive pings from the server. + # Value must be -1 (to disable) or greater than 0 + # Default: 30,000 + # + # :initial_connect_attempts => int, + # ? + # + # :failover_on_initial_connection => true | false, + # Sets whether the client will automatically attempt to connect to + # the backup server if the initial connection to the live server fails + # true : If live server is not reachable try to connect to backup server + # false: Fail to start if live server is not reachable + # Default: false + # + # :max_retry_interval => long, + # Sets the maximum retry interval in milliseconds. + # Only appicable if the retry interval multiplier has been specified + # Default: 2000 (2 seconds) + # + # :reconnect_attempts => 1, + # :retry_interval => long, + # Returns the time to retry the connection after failure. + # Value is in milliseconds. + # Default: 2000 (2 seconds) + # + # :retry_interval_multiplier => double, + # Sets the multiplier to apply to successive retry intervals. + # Value must be positive. + # Default: 1 + # + # Large Message parameters: + # + # :cache_large_messages_client => true | false, + # Sets whether large messages received by consumers will be + # cached in temporary files or not. + # When true, consumers will create temporary files to cache large messages. + # There is 1 temporary file created for each large message. + # Default: false + # + # :min_large_message_size => int, + # Sets the large message size threshold in bytes. Value must be > 0 + # Messages whose size is if greater than this value will be handled as large messages + # Default: 102400 bytes (100 KBytes) + # + # :compress_large_message => true | false, + # + # Message Rate Management: + # + # :consumer_max_rate => int, + # Sets the maximum rate of message consumption for consumers. + # Controls the rate at which a consumer can consume messages. + # A consumer will never consume messages at a rate faster than the + # rate specified. + # -1 : Disable + # >=0 : Maximum desired message consumption rate specified + # in units of messages per second. + # Default: -1 + # + # :producer_max_rate => int, + # Sets the maximum rate of message production for producers. + # Controls the rate at which a producer can produce messages. + # A producer will never produce messages at a rate faster than the rate specified. + # -1 : Disabled + # >0 : Maximum desired message production rate specified in units of messages per second. + # Default: -1 (Disabled) + # + # Thread Pools: + # + # :scheduled_thread_pool_max_size => int, + # Sets the maximum size of the scheduled thread pool. + # This setting is relevant only if this factory does not use global pools. + # Value must be greater than 0. + # Default: 5 + # + # :thread_pool_max_size => int, + # Sets the maximum size of the thread pool. + # This setting is relevant only if this factory does not use + # global pools. + # -1 : Unlimited thread pool + # >0 : Number of threads in pool + # Default: -1 (Unlimited) + # + # :use_global_pools => true | false, + # Sets whether this factory will use global thread pools + # (shared among all the factories in the same JVM) or its own pools. + # true: Uses global JVM thread pools across all HornetQ connections + # false: Use a thread pool just for this connection + # Default: true + # + # Window Sizes: + # + # :confirmation_window_size => int, + # Set the size in bytes for the confirmation window of this connection. + # -1 : Disable the window + # >0 : Size in bytes + # Default: -1 (Disabled) + # + # :consumer_window_size => int, + # Sets the window size for flow control for consumers. + # -1 : Disable flow control + # 0 : Do Not buffer any messages + # >0 : Set the maximum size of the buffer + # Default: 1048576 (1 MB) + # + # :producer_window_size => int, + # Sets the window size for flow control of the producers. + # -1 : Disable flow control + # >0 : The maximum amount of bytes at any give time (to prevent overloading the connection). + # Default: 65536 (64 KBytes) + # + # Other: + # + # :connection_load_balancing_policy_class_name => string, + # Set the class name of the connection load balancing policy + # Value must be the name of a class implementing org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy + # Default: "org.hornetq.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy" + # + # :initial_message_packet_size => int, + # Sets the initial size of messages in bytes + # Value must be greater than 0 + # def initialize(params={}) params =params.clone uri = nil # TODO: Support :uri as an array for cluster configurations if params.kind_of?(String) @@ -168,54 +286,59 @@ uri = HornetQ::URI.new(params.delete(:uri)) # params override uri params params = uri.params.merge(params) end - @connection = nil - @sessions = [] - @consumers = [] # In-VM Transport has no fail-over or additional parameters @is_invm = uri.host == 'invm' + transport_list = [] if @is_invm - transport = Java::org.hornetq.api.core.TransportConfiguration.new(HornetQ::INVM_CONNECTOR_CLASS_NAME) - @connection = Java::org.hornetq.api.core.client.HornetQClient.create_client_session_factory(transport) - elsif params[:protocol] - # Auto-Discovery just has a host name and port - if params[:protocol] == 'discovery' - @connection = Java::org.hornetq.api.core.client.HornetQClient.create_client_session_factory(uri.host, uri.port) - elsif params[:protocol] != 'netty' - raise "Unknown HornetQ protocol:#{params[:protocol]}" - end - end + transport_list << Java::org.hornetq.api.core::TransportConfiguration.new(HornetQ::INVM_CONNECTOR_CLASS_NAME) + else + case params[:protocol] + when 'discovery' + #TODO: Also support: DiscoveryGroupConfiguration(String name, String localBindAddress, String groupAddress, int groupPort, long refreshTimeout, long discoveryInitialWaitTimeout) + transport_list << Java::org.hornetq.api.core::DiscoveryGroupConfiguration.new(uri.host, uri.port) + when 'netty', nil + transport_list << Java::org.hornetq.api.core::TransportConfiguration.new(HornetQ::NETTY_CONNECTOR_CLASS_NAME, {'host' => uri.host, 'port' => uri.port }) - # Unless already created, then the connection will use the netty protocol - unless @connection - # Primary Transport - transport = Java::org.hornetq.api.core.TransportConfiguration.new(HornetQ::NETTY_CONNECTOR_CLASS_NAME, {'host' => uri.host, 'port' => uri.port }) - - # Check for backup server connection information - if uri.backup_host - backup_transport = Java::org.hornetq.api.core.TransportConfiguration.new(HornetQ::NETTY_CONNECTOR_CLASS_NAME, {'host' => uri.backup_host, 'port' => uri.backup_port }) - @connection = Java::org.hornetq.api.core.client.HornetQClient.create_client_session_factory(transport, backup_transport) + if uri.backup_host + transport_list << Java::org.hornetq.api.core::TransportConfiguration.new(HornetQ::NETTY_CONNECTOR_CLASS_NAME, {'host' => uri.backup_host, 'port' => uri.backup_port }) + end else - @connection = Java::org.hornetq.api.core.client.HornetQClient.create_client_session_factory(transport) + raise "Unknown HornetQ protocol:'#{params[:protocol]}'" end end - # If any other options were supplied, apply them to the created Connection instance + #TODO: Support: server_locator.addInterceptor + + # Create server locator with or without HA. Without HA being the default + @server_locator = if params[:ha] + Java::org.hornetq.api.core.client::HornetQClient.createServerLocatorWithHA(*transport_list) + #TODO: Support: server_locator.addClusterTopologyListener + else + Java::org.hornetq.api.core.client::HornetQClient.createServerLocatorWithoutHA(*transport_list) + end + + # If any other options were supplied, apply them to the server locator params.each_pair do |key, val| method = key.to_s+'=' - if @connection.respond_to? method - @connection.send method, val - #puts "Debug: #{key} = #{@connection.send key}" if @connection.respond_to? key.to_sym + if @server_locator.respond_to? method + @server_locator.send method, val + HornetQ.logger.trace { "HornetQ ServerLocator setting: #{key} = #{@connection.send key}" } if @server_locator.respond_to? key.to_sym else - HornetQ.logger.warn "Warning: Option:#{key}, with value:#{val} is invalid and being ignored" + HornetQ.logger.warn "Warning: Option:#{key}, with value:#{val} is invalid and will be ignored" end end + + @connection = @server_locator.createSessionFactory + # For handling managed sessions and consumers + @sessions = [] + @consumers = [] end - # Return true if this connection was configured in INVM transport protocol + # Return true if this connection was configured to use INVM transport protocol def invm? @is_invm end # Create a new HornetQ session @@ -368,13 +491,13 @@ ensure session.close if session end end - # Create a session, start the session, call the supplied block + # Create a session, start the session, call the supplied block # and once the block completes close the session. - # + # # See: #session_create for the Parameters # # Returns the result of the block # # @@ -415,11 +538,13 @@ # Close Connection connections def close @sessions.each { |session| session.close } @connection.close if @connection + @server_locator.close if @server_locator @connection = nil + @server_locator = nil end # Receive messages in a separate thread when they arrive # Allows messages to be received in a separate thread. I.e. Asynchronously # This method will return to the caller before messages are processed. @@ -488,16 +613,16 @@ end def on_message_statistics @consumers.collect{|consumer| consumer.on_message_statistics} end - + # Start all sessions managed by this connection - # + # # Sessions created via #create_session are not managed unless # :managed => true was specified when the session was created - # + # # Session are Only managed when created through the following methods: # Connection#on_message # Connection#create_session And :managed => true # # This call does not do anything to sessions in a session pool @@ -511,7 +636,7 @@ # See: #start_managed_sessions for details on which sessions are managed def stop_managed_sessions @sessions.each {|session| session.stop} end end - end + end end \ No newline at end of file