lib/bunny/session.rb in bunny-2.1.0 vs lib/bunny/session.rb in bunny-2.2.0
- old
+ new
@@ -93,10 +93,11 @@
# @param [String, Hash] connection_string_or_opts Connection string or a hash of connection options
# @param [Hash] optz Extra options not related to connection
#
# @option connection_string_or_opts [String] :host ("127.0.0.1") Hostname or IP address to connect to
# @option connection_string_or_opts [Array<String>] :hosts (["127.0.0.1"]) list of hostname or IP addresses to select hostname from when connecting
+ # @option connection_string_or_opts [Array<String>] :addresses (["127.0.0.1:5672"]) list of addresses to select hostname and port from when connecting
# @option connection_string_or_opts [Integer] :port (5672) Port RabbitMQ listens on
# @option connection_string_or_opts [String] :username ("guest") Username
# @option connection_string_or_opts [String] :password ("guest") Password
# @option connection_string_or_opts [String] :vhost ("/") Virtual host to use
# @option connection_string_or_opts [Integer] :heartbeat (600) Heartbeat interval. 0 means no heartbeat.
@@ -111,12 +112,13 @@
# @option connection_string_or_opts [Proc] :hosts_shuffle_strategy A Proc that reorders a list of host strings, defaults to Array#shuffle
# @option connection_string_or_opts [Logger] :logger The logger. If missing, one is created using :log_file and :log_level.
# @option connection_string_or_opts [IO, String] :log_file The file or path to use when creating a logger. Defaults to STDOUT.
# @option connection_string_or_opts [IO, String] :logfile DEPRECATED: use :log_file instead. The file or path to use when creating a logger. Defaults to STDOUT.
# @option connection_string_or_opts [Integer] :log_level The log level to use when creating a logger. Defaults to LOGGER::WARN
- # @option connection_string_or_opts [Boolean] :automatically_recover Should automatically recover from network failures?
- # @option connection_string_or_opts [Integer] :recovery_attempts Max number of recovery attempts
+ # @option connection_string_or_opts [Boolean] :automatically_recover (true) Should automatically recover from network failures?
+ # @option connection_string_or_opts [Integer] :recovery_attempts (nil) Max number of recovery attempts, nil means forever, 0 means never
+ # @option connection_string_or_opts [Boolean] :recover_from_connection_close (true) Recover from server-sent connection.close
#
# @option optz [String] :auth_mechanism ("PLAIN") Authentication mechanism, PLAIN or EXTERNAL
# @option optz [String] :locale ("PLAIN") Locale RabbitMQ should use
#
# @see http://rubybunny.info/articles/connecting.html Connecting to RabbitMQ guide
@@ -133,32 +135,33 @@
end.merge(optz)
@default_hosts_shuffle_strategy = Proc.new { |hosts| hosts.shuffle }
@opts = opts
- @hosts = self.hostnames_from(opts)
- @host_index = 0
+ @addresses = self.addresses_from(opts)
+ @address_index = 0
- @port = self.port_from(opts)
@user = self.username_from(opts)
@pass = self.password_from(opts)
@vhost = self.vhost_from(opts)
@threaded = opts.fetch(:threaded, true)
log_file = opts[:log_file] || opts[:logfile] || STDOUT
log_level = opts[:log_level] || ENV["BUNNY_LOG_LEVEL"] || Logger::WARN
@logger = opts.fetch(:logger, init_default_logger(log_file, log_level))
+ validate_connection_options(opts)
+
# should automatic recovery from network failures be used?
@automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil?
true
else
opts[:automatically_recover] || opts[:automatic_recovery]
end
@recovery_attempts = opts[:recovery_attempts]
@network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL)
- @recover_from_connection_close = opts.fetch(:recover_from_connection_close, false)
+ @recover_from_connection_close = opts.fetch(:recover_from_connection_close, true)
# in ms
@continuation_timeout = opts.fetch(:continuation_timeout, DEFAULT_CONTINUATION_TIMEOUT)
@status = :not_connected
@blocked = false
@@ -181,20 +184,30 @@
@channel_mutex = @mutex_impl.new
# transport operations/continuations mutex. A workaround for
# the non-reentrant Ruby mutexes. MK.
@transport_mutex = @mutex_impl.new
@status_mutex = @mutex_impl.new
- @host_index_mutex = @mutex_impl.new
+ @address_index_mutex = @mutex_impl.new
@channels = Hash.new
@origin_thread = Thread.current
self.reset_continuations
self.initialize_transport
end
+ def validate_connection_options(options)
+ if options[:hosts] && options[:addresses]
+ raise ArgumentError, "Connection options can't contain hosts and addresses at the same time"
+ end
+
+ if (options[:host] || options[:hostname]) && (options[:hosts] || options[:addresses])
+ @logger.warn "The connection options contain both a host and an array of hosts, the single host is ignored."
+ end
+ end
+
# @return [String] RabbitMQ hostname (or IP address) used
def hostname; self.host; end
# @return [String] Username used
def username; self.user; end
# @return [String] Password used
@@ -221,17 +234,21 @@
def threaded?
@threaded
end
def host
- @transport ? @transport.host : @hosts[@host_index]
+ @transport ? @transport.host : host_from_address(@addresses[@address_index])
end
- def reset_host_index
- @host_index_mutex.synchronize { @host_index = 0 }
+ def port
+ @transport ? @transport.port : port_from_address(@addresses[@address_index])
end
+ def reset_address_index
+ @address_index_mutex.synchronize { @address_index = 0 }
+ end
+
# @private
attr_reader :mutex_impl
# Provides a way to fine tune the socket used by connection.
# Accepts a block that the socket will be yielded to.
@@ -291,11 +308,11 @@
rescue
@status_mutex.synchronize { @status = :not_connected }
raise
end
rescue HostListDepleted
- self.reset_host_index
+ self.reset_address_index
@status_mutex.synchronize { @status = :not_connected }
raise TCPConnectionFailedForAllHosts
end
self
@@ -657,11 +674,11 @@
@recovering_from_network_failure = false
recover_channels
end
rescue HostListDepleted
- reset_host_index
+ reset_address_index
retry
rescue TCPConnectionFailedForAllHosts, TCPConnectionFailed, AMQ::Protocol::EmptyResponseError => e
@logger.warn "TCP connection failed, reconnecting in #{@network_recovery_interval} seconds"
sleep @network_recovery_interval
if should_retry_recovery?
@@ -742,17 +759,30 @@
close_transport
end
end
# @private
- def hostnames_from(options)
- options.fetch(:hosts_shuffle_strategy, @default_hosts_shuffle_strategy).call(
- [ options[:hosts] || options[:host] || options[:hostname] || DEFAULT_HOST ].flatten
- )
+ def addresses_from(options)
+ shuffle_strategy = options.fetch(:hosts_shuffle_strategy, @default_hosts_shuffle_strategy)
+
+ addresses = options[:host] || options[:hostname] || options[:addresses] ||
+ options[:hosts] || ["#{DEFAULT_HOST}:#{port_from(options)}"]
+ addresses = [addresses] unless addresses.is_a? Array
+
+ addresses.map! do |address|
+ host_with_port?(address) ? address : "#{address}:#{port_from(@opts)}"
+ end
+
+ shuffle_strategy.call addresses
end
# @private
+ def host_with_port?(address)
+ address.include? ':'
+ end
+
+ # @private
def port_from(options)
fallback = if options[:tls] || options[:ssl]
AMQ::Protocol::TLS_PORT
else
AMQ::Protocol::DEFAULT_PORT
@@ -760,10 +790,20 @@
options.fetch(:port, fallback)
end
# @private
+ def host_from_address(address)
+ address.split(":")[0]
+ end
+
+ # @private
+ def port_from_address(address)
+ address.split(":")[1].to_i
+ end
+
+ # @private
def vhost_from(options)
options[:virtual_host] || options[:vhost] || DEFAULT_VHOST
end
# @private
@@ -944,11 +984,11 @@
# @return [String]
# @api public
def to_s
oid = ("0x%x" % (self.object_id << 1))
- "#<#{self.class.name}:#{oid} #{@user}@#{host}:#{@port}, vhost=#{@vhost}, hosts=[#{@hosts.join(',')}]>"
+ "#<#{self.class.name}:#{oid} #{@user}@#{host}:#{port}, vhost=#{@vhost}, addresses=[#{@addresses.join(',')}]>"
end
def inspect
to_s
end
@@ -1107,13 +1147,17 @@
@heartbeat_sender.stop if @heartbeat_sender
end
# @private
def initialize_transport
- if host = @hosts[ @host_index ]
- @host_index_mutex.synchronize { @host_index += 1 }
+ if address = @addresses[ @address_index ]
+ @address_index_mutex.synchronize { @address_index += 1 }
@transport.close rescue nil # Let's make sure the previous transport socket is closed
- @transport = Transport.new(self, host, @port, @opts.merge(:session_thread => @origin_thread))
+ @transport = Transport.new(self,
+ host_from_address(address),
+ port_from_address(address),
+ @opts.merge(:session_thread => @origin_thread)
+ )
# Reset the cached progname for the logger only when no logger was provided
@default_logger.progname = self.to_s
@transport