lib/bunny/session.rb in bunny-1.4.1 vs lib/bunny/session.rb in bunny-1.5.0.pre1
- old
+ new
@@ -80,13 +80,12 @@
# API
#
# @return [Bunny::Transport]
attr_reader :transport
- attr_reader :status, :host, :port, :heartbeat, :user, :pass, :vhost, :frame_max, :channel_max, :threaded
+ attr_reader :status, :port, :heartbeat, :user, :pass, :vhost, :frame_max, :channel_max, :threaded
attr_reader :server_capabilities, :server_properties, :server_authentication_mechanisms, :server_locales
- attr_reader :default_channel
attr_reader :channel_id_allocator
# Authentication mechanism, e.g. "PLAIN" or "EXTERNAL"
# @return [String]
attr_reader :mechanism
# @return [Logger]
@@ -97,22 +96,25 @@
# @param [String, Hash] connection_string_or_opts Connection string or a hash of connection options
# @param [Hash] optz Extra options not related to connection
#
# @option connection_string_or_opts [String] :host ("127.0.0.1") Hostname or IP address to connect to
+ # @option connection_string_or_opts [Array<String>] :hosts (["127.0.0.1"]) list of hostname or IP addresses to select hostname from when connecting
# @option connection_string_or_opts [Integer] :port (5672) Port RabbitMQ listens on
# @option connection_string_or_opts [String] :username ("guest") Username
# @option connection_string_or_opts [String] :password ("guest") Password
# @option connection_string_or_opts [String] :vhost ("/") Virtual host to use
# @option connection_string_or_opts [Integer] :heartbeat (600) Heartbeat interval. 0 means no heartbeat.
# @option connection_string_or_opts [Integer] :network_recovery_interval (4) Recovery interval periodic network recovery will use. This includes initial pause after network failure.
# @option connection_string_or_opts [Boolean] :tls (false) Should TLS/SSL be used?
# @option connection_string_or_opts [String] :tls_cert (nil) Path to client TLS/SSL certificate file (.pem)
# @option connection_string_or_opts [String] :tls_key (nil) Path to client TLS/SSL private key file (.pem)
# @option connection_string_or_opts [Array<String>] :tls_ca_certificates Array of paths to TLS/SSL CA files (.pem), by default detected from OpenSSL configuration
+ # @option connection_string_or_opts [String] :verify_peer (true) Whether TLS peer verification should be performed
# @option connection_string_or_opts [Integer] :continuation_timeout (4000) Timeout for client operations that expect a response (e.g. {Bunny::Queue#get}), in milliseconds.
# @option connection_string_or_opts [Integer] :connection_timeout (5) Timeout in seconds for connecting to the server.
+ # @option connection_string_or_opts [Proc] :hosts_shuffle_strategy A Proc that reorders a list of host strings, defaults to Array#shuffle
#
# @option optz [String] :auth_mechanism ("PLAIN") Authentication mechanism, PLAIN or EXTERNAL
# @option optz [String] :locale ("PLAIN") Locale RabbitMQ should use
#
# @see http://rubybunny.info/articles/connecting.html Connecting to RabbitMQ guide
@@ -126,12 +128,16 @@
self.class.parse_uri(ENV["RABBITMQ_URL"] || connection_string_or_opts)
when Hash then
connection_string_or_opts
end.merge(optz)
+ @default_hosts_shuffle_strategy = Proc.new { |hosts| hosts.shuffle }
+
@opts = opts
- @host = self.hostname_from(opts)
+ @hosts = self.hostnames_from(opts)
+ @host_index = 0
+
@port = self.port_from(opts)
@user = self.username_from(opts)
@pass = self.password_from(opts)
@vhost = self.vhost_from(opts)
@logfile = opts[:log_file] || opts[:logfile] || STDOUT
@@ -171,10 +177,12 @@
@channel_mutex = @mutex_impl.new
# transport operations/continuations mutex. A workaround for
# the non-reentrant Ruby mutexes. MK.
@transport_mutex = @mutex_impl.new
@status_mutex = @mutex_impl.new
+ @host_index_mutex = @mutex_impl.new
+
@channels = Hash.new
@origin_thread = Thread.current
self.reset_continuations
@@ -208,10 +216,18 @@
# @return [Boolean] true if this connection uses a separate thread for I/O activity
def threaded?
@threaded
end
+ def host
+ @transport ? @transport.host : @hosts[@host_index]
+ end
+
+ def reset_host_index
+ @host_index_mutex.synchronize { @host_index = 0 }
+ end
+
# @private
attr_reader :mutex_impl
# Provides a way to fine tune the socket used by connection.
# Accepts a block that the socket will be yielded to.
@@ -230,40 +246,56 @@
#
# @see http://rubybunny.info/articles/getting_started.html
# @see http://rubybunny.info/articles/connecting.html
# @api public
def start
+
return self if connected?
@status_mutex.synchronize { @status = :connecting }
# reset here for cases when automatic network recovery kicks in
# when we were blocked. MK.
@blocked = false
self.reset_continuations
begin
- # close existing transport if we have one,
- # to not leak sockets
- @transport.maybe_initialize_socket
- @transport.post_initialize_socket
- @transport.connect
+ begin
- if @socket_configurator
- @transport.configure_socket(&@socket_configurator)
- end
+ # close existing transport if we have one,
+ # to not leak sockets
+ @transport.maybe_initialize_socket
- self.init_connection
- self.open_connection
+ @transport.post_initialize_socket
+ @transport.connect
- @reader_loop = nil
- self.start_reader_loop if threaded?
+ if @socket_configurator
+ @transport.configure_socket(&@socket_configurator)
+ end
- @default_channel = self.create_channel unless @default_channel
- rescue Exception => e
+ self.init_connection
+ self.open_connection
+
+ @reader_loop = nil
+ self.start_reader_loop if threaded?
+
+ rescue TCPConnectionFailed => e
+ self.initialize_transport
+
+ @logger.warn e.message
+ @logger.warn "Retrying connection on next host in line: #{@transport.host}:#{@transport.port}"
+
+ return self.start
+ rescue Exception
+ @status_mutex.synchronize { @status = :not_connected }
+ raise
+ end
+
+ rescue HostListDepleted
+ self.reset_host_index
@status_mutex.synchronize { @status = :not_connected }
- raise e
+ raise TCPConnectionFailedForAllHosts
end
self
end
@@ -353,44 +385,10 @@
# @return [Boolean] true if this connection has automatic recovery from network failure enabled
def automatically_recover?
@automatically_recover
end
- #
- # Backwards compatibility
- #
-
- # @private
- def queue(*args)
- @default_channel.queue(*args)
- end
-
- # @private
- def direct(*args)
- @default_channel.direct(*args)
- end
-
- # @private
- def fanout(*args)
- @default_channel.fanout(*args)
- end
-
- # @private
- def topic(*args)
- @default_channel.topic(*args)
- end
-
- # @private
- def headers(*args)
- @default_channel.headers(*args)
- end
-
- # @private
- def exchange(*args)
- @default_channel.exchange(*args)
- end
-
# Defines a callback that will be executed when RabbitMQ blocks the connection
# because it is running low on memory or disk space (as configured via config file
# and/or rabbitmqctl).
#
# @yield [AMQ::Protocol::Connection::Blocked] connection.blocked method which provides a reason for blocking
@@ -644,27 +642,32 @@
# @private
def recover_from_network_failure
begin
sleep @network_recovery_interval
@logger.debug "About to start connection recovery..."
+
+ self.reset_host_index # since we are starting a fresh try.
self.initialize_transport
self.start
if open?
@recovering_from_network_failure = false
recover_channels
end
- rescue TCPConnectionFailed, AMQ::Protocol::EmptyResponseError => e
+ rescue TCPConnectionFailedForAllHosts, TCPConnectionFailed, AMQ::Protocol::EmptyResponseError => e
@logger.warn "TCP connection failed, reconnecting in #{@network_recovery_interval} seconds"
sleep @network_recovery_interval
retry if recoverable_network_failure?(e)
end
end
# @private
def recover_channels
+ # default channel is reopened right after connection
+ # negotiation is completed, so make sure we do not try to open
+ # it twice. MK.
@channels.each do |n, ch|
ch.open
ch.recover_from_network_failure
end
@@ -724,12 +727,14 @@
close_transport
end
end
# @private
- def hostname_from(options)
- options[:host] || options[:hostname] || DEFAULT_HOST
+ def hostnames_from(options)
+ options.fetch(:hosts_shuffle_strategy, @default_hosts_shuffle_strategy).call(
+ [ options[:hosts] || options[:host] || options[:hostname] || DEFAULT_HOST ].flatten
+ )
end
# @private
def port_from(options)
fallback = if options[:tls] || options[:ssl]
@@ -921,11 +926,11 @@
end # send_frameset_without_timeout(frames)
# @return [String]
# @api public
def to_s
- "#<#{self.class.name}:#{object_id} #{@user}@#{@host}:#{@port}, vhost=#{@vhost}>"
+ "#<#{self.class.name}:#{object_id} #{@user}@#{host}:#{@port}, vhost=#{@vhost}, hosts=[#{@hosts.join(',')}]>"
end
protected
# @private
@@ -1074,10 +1079,15 @@
@heartbeat_sender.stop if @heartbeat_sender
end
# @private
def initialize_transport
- @transport = Transport.new(self, @host, @port, @opts.merge(:session_thread => @origin_thread))
+ if host = @hosts[ @host_index ]
+ @host_index_mutex.synchronize { @host_index += 1 }
+ @transport = Transport.new(self, host, @port, @opts.merge(:session_thread => @origin_thread))
+ else
+ raise HostListDepleted
+ end
end
# @private
def maybe_close_transport
@transport.close if @transport