lib/hornetq/client/connection.rb in jruby-hornetq-0.3.3 vs lib/hornetq/client/connection.rb in jruby-hornetq-0.4.0
- old
+ new
@@ -32,11 +32,11 @@
#
# Creates a new connection and session, then passes the session to the supplied
# block. Upon completion the session and connection are both closed
# See Connection::initialize and Connection::create_session for the list
# of parameters
- #
+ #
# Returns result of block
def self.start_session(params={},&proc)
session(params) do |session|
session.start
proc.call(session)
@@ -61,54 +61,12 @@
result
end
# Create a new Connection from which sessions can be created
#
- # Parameters:
- # * a Hash consisting of one or more of the named parameters
- # * Summary of parameters and their default values
- # HornetQ::Client::Connection.new(
- # :uri => 'hornetq://localhost',
- # :ack_batch_size => ,
- # :auto_group => ,
- # :block_on_acknowledge => ,
- # :block_on_durable_send => ,
- # :block_on_non_durable_send => ,
- # :cache_large_messages_client => ,
- # :call_timeout => ,
- # :client_failure_check_period => ,
- # :confirmation_window_size => ,
- # :connection_load_balancing_policy_class_name => ,
- # :connection_ttl => ,
- # :consumer_max_rate => ,
- # :consumer_window_size => ,
- # :discovery_address => ,
- # :discovery_initial_wait_timeout => ,
- # :discovery_port => ,
- # :discovery_refresh_timeout => ,
- # :failover_on_initial_connection => true,
- # :failover_on_server_shutdown => true,
- # :group_id => ,
- # :initial_message_packet_size => ,
- # :java_object => ,
- # :local_bind_address => ,
- # :max_retry_interval => ,
- # :min_large_message_size => ,
- # :pre_acknowledge => ,
- # :producer_max_rate => ,
- # :producer_window_size => ,
- # :reconnect_attempts => 1,
- # :retry_interval => ,
- # :retry_interval_multiplier => ,
- # :scheduled_thread_pool_max_size => ,
- # :static_connectors => ,
- # :thread_pool_max_size => ,
- # :use_global_pools =>
- # )
- #
# Mandatory Parameters
- # * :uri
+ # * :uri => 'hornetq://localhost',
# * The hornetq uri as to which server to connect with and which
# transport protocol to use. Format:
# hornetq://server:port,backupserver:port/?protocol=[netty|discover]
# * To use the default netty transport
# hornetq://server:port
@@ -116,48 +74,208 @@
# hornetq://server:port,backupserver:port
# * To use auto-discovery
# hornetq://server:port/?protocol=discovery
# * To use HornetQ within the current JVM
# hornetq://invm
- #
# Optional Parameters
- # * :ack_batch_size
- # * :auto_group
- # * :block_on_acknowledge
- # * :block_on_durable_send
- # * :block_on_non_durable_send
- # * :cache_large_messages_client
- # * :call_timeout
- # * :client_failure_check_period
- # * :confirmation_window_size
- # * :connection_load_balancing_policy_class_name
- # * :connection_ttl
- # * :consumer_max_rate
- # * :consumer_window_size
- # * :discovery_address
- # * :discovery_initial_wait_timeout
- # * :discovery_port
- # * :discovery_refresh_timeout
- # * :failover_on_initial_connection
- # * :failover_on_server_shutdown
- # * :group_id
- # * :initial_message_packet_size
- # * :java_object
- # * :local_bind_address
- # * :max_retry_interval
- # * :min_large_message_size
- # * :pre_acknowledge
- # * :producer_max_rate
- # * :producer_window_size
- # * :reconnect_attempts
- # * :retry_interval
- # * :retry_interval_multiplier
- # * :scheduled_thread_pool_max_size
- # * :static_connectors
- # * :thread_pool_max_size
- # * :use_global_pools
-
+ #
+ # High Availability
+ #
+ # * :ha => true | false,
+ # true: Receives cluster topology updates from the cluster as
+ # servers leave or join and new backups are appointed or removed.
+ # false: Uses the suplied static list of hosts in :uri
+ # and no HA backup information is propagated to the client
+ # Default: false
+ #
+ # Flow Control
+ #
+ # :ack_batch_size => integer,
+ # Sets the acknowledgements batch size. Must be > 0
+ #
+ # :pre_acknowledge => true | false,
+ # Sets whether messages will pre-acknowledged on the server before
+ # they are sent to the consumers or not
+ # true : Pre-acknowledge consumed messages on the server before they are sent to consumers
+ # false: Clients acknowledge the message they consume.
+ # Default: false
+ #
+ # Grouping:
+ #
+ # :auto_group => true | false,
+ # Sets whether producers will automatically assign a group ID
+ # to sent messages
+ # true: A random unique group ID is created and set on each message
+ # for the property Message.HDR_GROUP_ID
+ # Default: false
+ #
+ # :group_id => string,
+ # Sets the group ID that will be set on each message sent
+ # Default: nil (no goup id will be set)
+ #
+ # Blocking calls:
+ #
+ # :block_on_acknowledge => true | false,
+ # Sets whether consumers created through this factory will block
+ # while sending message acknowledgements or do it asynchronously.
+ # Default: false
+ #
+ # :block_on_durable_send => true | false,
+ # Sets whether producers will block while sending durable messages
+ # or do it asynchronously.
+ # If the session is configured to send durable message asynchronously,
+ # the client can set a SendAcknowledgementHandler on the ClientSession
+ # to be notified once the message has been handled by the server.
+ # Default: true
+ #
+ # :block_on_non_durable_send => true | false,
+ # Sets whether producers will block while sending non-durable messages
+ # or do it asynchronously.
+ # If the session is configured to send non-durable message asynchronously,
+ # the client can set a SendAcknowledgementHandler on the ClientSession
+ # to be notified once the message has been handled by the server.
+ # Default: false
+ #
+ # :call_timeout => long,
+ # Sets the blocking calls timeout in milliseconds. If client's blocking calls to the
+ # server take more than this timeout, the call will throw a
+ # HornetQException with the code HornetQException.CONNECTION_TIMEDOUT.
+ # Value is in milliseconds, default value is HornetQClient.DEFAULT_CALL_TIMEOUT.
+ # Must be >= 0
+ #
+ # Client Reconnection Parameters:
+ #
+ # :connection_ttl => long,
+ # Set the connection time-to-live
+ # -1 : Disable
+ # >=0 : milliseconds the server will keep a connection alive in the
+ # absence of any data arriving from the client.
+ # Default: 60,000
+ #
+ # :client_failure_check_period => long,
+ # Sets the period in milliseconds used to check if a client has
+ # failed to receive pings from the server.
+ # Value must be -1 (to disable) or greater than 0
+ # Default: 30,000
+ #
+ # :initial_connect_attempts => int,
+ # ?
+ #
+ # :failover_on_initial_connection => true | false,
+ # Sets whether the client will automatically attempt to connect to
+ # the backup server if the initial connection to the live server fails
+ # true : If live server is not reachable try to connect to backup server
+ # false: Fail to start if live server is not reachable
+ # Default: false
+ #
+ # :max_retry_interval => long,
+ # Sets the maximum retry interval in milliseconds.
+ # Only appicable if the retry interval multiplier has been specified
+ # Default: 2000 (2 seconds)
+ #
+ # :reconnect_attempts => 1,
+ # :retry_interval => long,
+ # Returns the time to retry the connection after failure.
+ # Value is in milliseconds.
+ # Default: 2000 (2 seconds)
+ #
+ # :retry_interval_multiplier => double,
+ # Sets the multiplier to apply to successive retry intervals.
+ # Value must be positive.
+ # Default: 1
+ #
+ # Large Message parameters:
+ #
+ # :cache_large_messages_client => true | false,
+ # Sets whether large messages received by consumers will be
+ # cached in temporary files or not.
+ # When true, consumers will create temporary files to cache large messages.
+ # There is 1 temporary file created for each large message.
+ # Default: false
+ #
+ # :min_large_message_size => int,
+ # Sets the large message size threshold in bytes. Value must be > 0
+ # Messages whose size is if greater than this value will be handled as large messages
+ # Default: 102400 bytes (100 KBytes)
+ #
+ # :compress_large_message => true | false,
+ #
+ # Message Rate Management:
+ #
+ # :consumer_max_rate => int,
+ # Sets the maximum rate of message consumption for consumers.
+ # Controls the rate at which a consumer can consume messages.
+ # A consumer will never consume messages at a rate faster than the
+ # rate specified.
+ # -1 : Disable
+ # >=0 : Maximum desired message consumption rate specified
+ # in units of messages per second.
+ # Default: -1
+ #
+ # :producer_max_rate => int,
+ # Sets the maximum rate of message production for producers.
+ # Controls the rate at which a producer can produce messages.
+ # A producer will never produce messages at a rate faster than the rate specified.
+ # -1 : Disabled
+ # >0 : Maximum desired message production rate specified in units of messages per second.
+ # Default: -1 (Disabled)
+ #
+ # Thread Pools:
+ #
+ # :scheduled_thread_pool_max_size => int,
+ # Sets the maximum size of the scheduled thread pool.
+ # This setting is relevant only if this factory does not use global pools.
+ # Value must be greater than 0.
+ # Default: 5
+ #
+ # :thread_pool_max_size => int,
+ # Sets the maximum size of the thread pool.
+ # This setting is relevant only if this factory does not use
+ # global pools.
+ # -1 : Unlimited thread pool
+ # >0 : Number of threads in pool
+ # Default: -1 (Unlimited)
+ #
+ # :use_global_pools => true | false,
+ # Sets whether this factory will use global thread pools
+ # (shared among all the factories in the same JVM) or its own pools.
+ # true: Uses global JVM thread pools across all HornetQ connections
+ # false: Use a thread pool just for this connection
+ # Default: true
+ #
+ # Window Sizes:
+ #
+ # :confirmation_window_size => int,
+ # Set the size in bytes for the confirmation window of this connection.
+ # -1 : Disable the window
+ # >0 : Size in bytes
+ # Default: -1 (Disabled)
+ #
+ # :consumer_window_size => int,
+ # Sets the window size for flow control for consumers.
+ # -1 : Disable flow control
+ # 0 : Do Not buffer any messages
+ # >0 : Set the maximum size of the buffer
+ # Default: 1048576 (1 MB)
+ #
+ # :producer_window_size => int,
+ # Sets the window size for flow control of the producers.
+ # -1 : Disable flow control
+ # >0 : The maximum amount of bytes at any give time (to prevent overloading the connection).
+ # Default: 65536 (64 KBytes)
+ #
+ # Other:
+ #
+ # :connection_load_balancing_policy_class_name => string,
+ # Set the class name of the connection load balancing policy
+ # Value must be the name of a class implementing org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy
+ # Default: "org.hornetq.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy"
+ #
+ # :initial_message_packet_size => int,
+ # Sets the initial size of messages in bytes
+ # Value must be greater than 0
+ #
def initialize(params={})
params =params.clone
uri = nil
# TODO: Support :uri as an array for cluster configurations
if params.kind_of?(String)
@@ -168,54 +286,59 @@
uri = HornetQ::URI.new(params.delete(:uri))
# params override uri params
params = uri.params.merge(params)
end
- @connection = nil
- @sessions = []
- @consumers = []
# In-VM Transport has no fail-over or additional parameters
@is_invm = uri.host == 'invm'
+ transport_list = []
if @is_invm
- transport = Java::org.hornetq.api.core.TransportConfiguration.new(HornetQ::INVM_CONNECTOR_CLASS_NAME)
- @connection = Java::org.hornetq.api.core.client.HornetQClient.create_client_session_factory(transport)
- elsif params[:protocol]
- # Auto-Discovery just has a host name and port
- if params[:protocol] == 'discovery'
- @connection = Java::org.hornetq.api.core.client.HornetQClient.create_client_session_factory(uri.host, uri.port)
- elsif params[:protocol] != 'netty'
- raise "Unknown HornetQ protocol:#{params[:protocol]}"
- end
- end
+ transport_list << Java::org.hornetq.api.core::TransportConfiguration.new(HornetQ::INVM_CONNECTOR_CLASS_NAME)
+ else
+ case params[:protocol]
+ when 'discovery'
+ #TODO: Also support: DiscoveryGroupConfiguration(String name, String localBindAddress, String groupAddress, int groupPort, long refreshTimeout, long discoveryInitialWaitTimeout)
+ transport_list << Java::org.hornetq.api.core::DiscoveryGroupConfiguration.new(uri.host, uri.port)
+ when 'netty', nil
+ transport_list << Java::org.hornetq.api.core::TransportConfiguration.new(HornetQ::NETTY_CONNECTOR_CLASS_NAME, {'host' => uri.host, 'port' => uri.port })
- # Unless already created, then the connection will use the netty protocol
- unless @connection
- # Primary Transport
- transport = Java::org.hornetq.api.core.TransportConfiguration.new(HornetQ::NETTY_CONNECTOR_CLASS_NAME, {'host' => uri.host, 'port' => uri.port })
-
- # Check for backup server connection information
- if uri.backup_host
- backup_transport = Java::org.hornetq.api.core.TransportConfiguration.new(HornetQ::NETTY_CONNECTOR_CLASS_NAME, {'host' => uri.backup_host, 'port' => uri.backup_port })
- @connection = Java::org.hornetq.api.core.client.HornetQClient.create_client_session_factory(transport, backup_transport)
+ if uri.backup_host
+ transport_list << Java::org.hornetq.api.core::TransportConfiguration.new(HornetQ::NETTY_CONNECTOR_CLASS_NAME, {'host' => uri.backup_host, 'port' => uri.backup_port })
+ end
else
- @connection = Java::org.hornetq.api.core.client.HornetQClient.create_client_session_factory(transport)
+ raise "Unknown HornetQ protocol:'#{params[:protocol]}'"
end
end
- # If any other options were supplied, apply them to the created Connection instance
+ #TODO: Support: server_locator.addInterceptor
+
+ # Create server locator with or without HA. Without HA being the default
+ @server_locator = if params[:ha]
+ Java::org.hornetq.api.core.client::HornetQClient.createServerLocatorWithHA(*transport_list)
+ #TODO: Support: server_locator.addClusterTopologyListener
+ else
+ Java::org.hornetq.api.core.client::HornetQClient.createServerLocatorWithoutHA(*transport_list)
+ end
+
+ # If any other options were supplied, apply them to the server locator
params.each_pair do |key, val|
method = key.to_s+'='
- if @connection.respond_to? method
- @connection.send method, val
- #puts "Debug: #{key} = #{@connection.send key}" if @connection.respond_to? key.to_sym
+ if @server_locator.respond_to? method
+ @server_locator.send method, val
+ HornetQ.logger.trace { "HornetQ ServerLocator setting: #{key} = #{@connection.send key}" } if @server_locator.respond_to? key.to_sym
else
- HornetQ.logger.warn "Warning: Option:#{key}, with value:#{val} is invalid and being ignored"
+ HornetQ.logger.warn "Warning: Option:#{key}, with value:#{val} is invalid and will be ignored"
end
end
+
+ @connection = @server_locator.createSessionFactory
+ # For handling managed sessions and consumers
+ @sessions = []
+ @consumers = []
end
- # Return true if this connection was configured in INVM transport protocol
+ # Return true if this connection was configured to use INVM transport protocol
def invm?
@is_invm
end
# Create a new HornetQ session
@@ -368,13 +491,13 @@
ensure
session.close if session
end
end
- # Create a session, start the session, call the supplied block
+ # Create a session, start the session, call the supplied block
# and once the block completes close the session.
- #
+ #
# See: #session_create for the Parameters
#
# Returns the result of the block
#
#
@@ -415,11 +538,13 @@
# Close Connection connections
def close
@sessions.each { |session| session.close }
@connection.close if @connection
+ @server_locator.close if @server_locator
@connection = nil
+ @server_locator = nil
end
# Receive messages in a separate thread when they arrive
# Allows messages to be received in a separate thread. I.e. Asynchronously
# This method will return to the caller before messages are processed.
@@ -488,16 +613,16 @@
end
def on_message_statistics
@consumers.collect{|consumer| consumer.on_message_statistics}
end
-
+
# Start all sessions managed by this connection
- #
+ #
# Sessions created via #create_session are not managed unless
# :managed => true was specified when the session was created
- #
+ #
# Session are Only managed when created through the following methods:
# Connection#on_message
# Connection#create_session And :managed => true
#
# This call does not do anything to sessions in a session pool
@@ -511,7 +636,7 @@
# See: #start_managed_sessions for details on which sessions are managed
def stop_managed_sessions
@sessions.each {|session| session.stop}
end
end
- end
+ end
end
\ No newline at end of file