lib/bunny/session.rb in bunny-2.1.0 vs lib/bunny/session.rb in bunny-2.2.0

- old
+ new

@@ -93,10 +93,11 @@ # @param [String, Hash] connection_string_or_opts Connection string or a hash of connection options # @param [Hash] optz Extra options not related to connection # # @option connection_string_or_opts [String] :host ("127.0.0.1") Hostname or IP address to connect to # @option connection_string_or_opts [Array<String>] :hosts (["127.0.0.1"]) list of hostname or IP addresses to select hostname from when connecting + # @option connection_string_or_opts [Array<String>] :addresses (["127.0.0.1:5672"]) list of addresses to select hostname and port from when connecting # @option connection_string_or_opts [Integer] :port (5672) Port RabbitMQ listens on # @option connection_string_or_opts [String] :username ("guest") Username # @option connection_string_or_opts [String] :password ("guest") Password # @option connection_string_or_opts [String] :vhost ("/") Virtual host to use # @option connection_string_or_opts [Integer] :heartbeat (600) Heartbeat interval. 0 means no heartbeat. @@ -111,12 +112,13 @@ # @option connection_string_or_opts [Proc] :hosts_shuffle_strategy A Proc that reorders a list of host strings, defaults to Array#shuffle # @option connection_string_or_opts [Logger] :logger The logger. If missing, one is created using :log_file and :log_level. # @option connection_string_or_opts [IO, String] :log_file The file or path to use when creating a logger. Defaults to STDOUT. # @option connection_string_or_opts [IO, String] :logfile DEPRECATED: use :log_file instead. The file or path to use when creating a logger. Defaults to STDOUT. # @option connection_string_or_opts [Integer] :log_level The log level to use when creating a logger. Defaults to LOGGER::WARN - # @option connection_string_or_opts [Boolean] :automatically_recover Should automatically recover from network failures? - # @option connection_string_or_opts [Integer] :recovery_attempts Max number of recovery attempts + # @option connection_string_or_opts [Boolean] :automatically_recover (true) Should automatically recover from network failures? + # @option connection_string_or_opts [Integer] :recovery_attempts (nil) Max number of recovery attempts, nil means forever, 0 means never + # @option connection_string_or_opts [Boolean] :recover_from_connection_close (true) Recover from server-sent connection.close # # @option optz [String] :auth_mechanism ("PLAIN") Authentication mechanism, PLAIN or EXTERNAL # @option optz [String] :locale ("PLAIN") Locale RabbitMQ should use # # @see http://rubybunny.info/articles/connecting.html Connecting to RabbitMQ guide @@ -133,32 +135,33 @@ end.merge(optz) @default_hosts_shuffle_strategy = Proc.new { |hosts| hosts.shuffle } @opts = opts - @hosts = self.hostnames_from(opts) - @host_index = 0 + @addresses = self.addresses_from(opts) + @address_index = 0 - @port = self.port_from(opts) @user = self.username_from(opts) @pass = self.password_from(opts) @vhost = self.vhost_from(opts) @threaded = opts.fetch(:threaded, true) log_file = opts[:log_file] || opts[:logfile] || STDOUT log_level = opts[:log_level] || ENV["BUNNY_LOG_LEVEL"] || Logger::WARN @logger = opts.fetch(:logger, init_default_logger(log_file, log_level)) + validate_connection_options(opts) + # should automatic recovery from network failures be used? @automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil? true else opts[:automatically_recover] || opts[:automatic_recovery] end @recovery_attempts = opts[:recovery_attempts] @network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL) - @recover_from_connection_close = opts.fetch(:recover_from_connection_close, false) + @recover_from_connection_close = opts.fetch(:recover_from_connection_close, true) # in ms @continuation_timeout = opts.fetch(:continuation_timeout, DEFAULT_CONTINUATION_TIMEOUT) @status = :not_connected @blocked = false @@ -181,20 +184,30 @@ @channel_mutex = @mutex_impl.new # transport operations/continuations mutex. A workaround for # the non-reentrant Ruby mutexes. MK. @transport_mutex = @mutex_impl.new @status_mutex = @mutex_impl.new - @host_index_mutex = @mutex_impl.new + @address_index_mutex = @mutex_impl.new @channels = Hash.new @origin_thread = Thread.current self.reset_continuations self.initialize_transport end + def validate_connection_options(options) + if options[:hosts] && options[:addresses] + raise ArgumentError, "Connection options can't contain hosts and addresses at the same time" + end + + if (options[:host] || options[:hostname]) && (options[:hosts] || options[:addresses]) + @logger.warn "The connection options contain both a host and an array of hosts, the single host is ignored." + end + end + # @return [String] RabbitMQ hostname (or IP address) used def hostname; self.host; end # @return [String] Username used def username; self.user; end # @return [String] Password used @@ -221,17 +234,21 @@ def threaded? @threaded end def host - @transport ? @transport.host : @hosts[@host_index] + @transport ? @transport.host : host_from_address(@addresses[@address_index]) end - def reset_host_index - @host_index_mutex.synchronize { @host_index = 0 } + def port + @transport ? @transport.port : port_from_address(@addresses[@address_index]) end + def reset_address_index + @address_index_mutex.synchronize { @address_index = 0 } + end + # @private attr_reader :mutex_impl # Provides a way to fine tune the socket used by connection. # Accepts a block that the socket will be yielded to. @@ -291,11 +308,11 @@ rescue @status_mutex.synchronize { @status = :not_connected } raise end rescue HostListDepleted - self.reset_host_index + self.reset_address_index @status_mutex.synchronize { @status = :not_connected } raise TCPConnectionFailedForAllHosts end self @@ -657,11 +674,11 @@ @recovering_from_network_failure = false recover_channels end rescue HostListDepleted - reset_host_index + reset_address_index retry rescue TCPConnectionFailedForAllHosts, TCPConnectionFailed, AMQ::Protocol::EmptyResponseError => e @logger.warn "TCP connection failed, reconnecting in #{@network_recovery_interval} seconds" sleep @network_recovery_interval if should_retry_recovery? @@ -742,17 +759,30 @@ close_transport end end # @private - def hostnames_from(options) - options.fetch(:hosts_shuffle_strategy, @default_hosts_shuffle_strategy).call( - [ options[:hosts] || options[:host] || options[:hostname] || DEFAULT_HOST ].flatten - ) + def addresses_from(options) + shuffle_strategy = options.fetch(:hosts_shuffle_strategy, @default_hosts_shuffle_strategy) + + addresses = options[:host] || options[:hostname] || options[:addresses] || + options[:hosts] || ["#{DEFAULT_HOST}:#{port_from(options)}"] + addresses = [addresses] unless addresses.is_a? Array + + addresses.map! do |address| + host_with_port?(address) ? address : "#{address}:#{port_from(@opts)}" + end + + shuffle_strategy.call addresses end # @private + def host_with_port?(address) + address.include? ':' + end + + # @private def port_from(options) fallback = if options[:tls] || options[:ssl] AMQ::Protocol::TLS_PORT else AMQ::Protocol::DEFAULT_PORT @@ -760,10 +790,20 @@ options.fetch(:port, fallback) end # @private + def host_from_address(address) + address.split(":")[0] + end + + # @private + def port_from_address(address) + address.split(":")[1].to_i + end + + # @private def vhost_from(options) options[:virtual_host] || options[:vhost] || DEFAULT_VHOST end # @private @@ -944,11 +984,11 @@ # @return [String] # @api public def to_s oid = ("0x%x" % (self.object_id << 1)) - "#<#{self.class.name}:#{oid} #{@user}@#{host}:#{@port}, vhost=#{@vhost}, hosts=[#{@hosts.join(',')}]>" + "#<#{self.class.name}:#{oid} #{@user}@#{host}:#{port}, vhost=#{@vhost}, addresses=[#{@addresses.join(',')}]>" end def inspect to_s end @@ -1107,13 +1147,17 @@ @heartbeat_sender.stop if @heartbeat_sender end # @private def initialize_transport - if host = @hosts[ @host_index ] - @host_index_mutex.synchronize { @host_index += 1 } + if address = @addresses[ @address_index ] + @address_index_mutex.synchronize { @address_index += 1 } @transport.close rescue nil # Let's make sure the previous transport socket is closed - @transport = Transport.new(self, host, @port, @opts.merge(:session_thread => @origin_thread)) + @transport = Transport.new(self, + host_from_address(address), + port_from_address(address), + @opts.merge(:session_thread => @origin_thread) + ) # Reset the cached progname for the logger only when no logger was provided @default_logger.progname = self.to_s @transport