lib/bunny/session.rb in bunny-1.4.1 vs lib/bunny/session.rb in bunny-1.5.0.pre1

- old
+ new

@@ -80,13 +80,12 @@ # API # # @return [Bunny::Transport] attr_reader :transport - attr_reader :status, :host, :port, :heartbeat, :user, :pass, :vhost, :frame_max, :channel_max, :threaded + attr_reader :status, :port, :heartbeat, :user, :pass, :vhost, :frame_max, :channel_max, :threaded attr_reader :server_capabilities, :server_properties, :server_authentication_mechanisms, :server_locales - attr_reader :default_channel attr_reader :channel_id_allocator # Authentication mechanism, e.g. "PLAIN" or "EXTERNAL" # @return [String] attr_reader :mechanism # @return [Logger] @@ -97,22 +96,25 @@ # @param [String, Hash] connection_string_or_opts Connection string or a hash of connection options # @param [Hash] optz Extra options not related to connection # # @option connection_string_or_opts [String] :host ("127.0.0.1") Hostname or IP address to connect to + # @option connection_string_or_opts [Array<String>] :hosts (["127.0.0.1"]) list of hostname or IP addresses to select hostname from when connecting # @option connection_string_or_opts [Integer] :port (5672) Port RabbitMQ listens on # @option connection_string_or_opts [String] :username ("guest") Username # @option connection_string_or_opts [String] :password ("guest") Password # @option connection_string_or_opts [String] :vhost ("/") Virtual host to use # @option connection_string_or_opts [Integer] :heartbeat (600) Heartbeat interval. 0 means no heartbeat. # @option connection_string_or_opts [Integer] :network_recovery_interval (4) Recovery interval periodic network recovery will use. This includes initial pause after network failure. # @option connection_string_or_opts [Boolean] :tls (false) Should TLS/SSL be used? # @option connection_string_or_opts [String] :tls_cert (nil) Path to client TLS/SSL certificate file (.pem) # @option connection_string_or_opts [String] :tls_key (nil) Path to client TLS/SSL private key file (.pem) # @option connection_string_or_opts [Array<String>] :tls_ca_certificates Array of paths to TLS/SSL CA files (.pem), by default detected from OpenSSL configuration + # @option connection_string_or_opts [String] :verify_peer (true) Whether TLS peer verification should be performed # @option connection_string_or_opts [Integer] :continuation_timeout (4000) Timeout for client operations that expect a response (e.g. {Bunny::Queue#get}), in milliseconds. # @option connection_string_or_opts [Integer] :connection_timeout (5) Timeout in seconds for connecting to the server. + # @option connection_string_or_opts [Proc] :hosts_shuffle_strategy A Proc that reorders a list of host strings, defaults to Array#shuffle # # @option optz [String] :auth_mechanism ("PLAIN") Authentication mechanism, PLAIN or EXTERNAL # @option optz [String] :locale ("PLAIN") Locale RabbitMQ should use # # @see http://rubybunny.info/articles/connecting.html Connecting to RabbitMQ guide @@ -126,12 +128,16 @@ self.class.parse_uri(ENV["RABBITMQ_URL"] || connection_string_or_opts) when Hash then connection_string_or_opts end.merge(optz) + @default_hosts_shuffle_strategy = Proc.new { |hosts| hosts.shuffle } + @opts = opts - @host = self.hostname_from(opts) + @hosts = self.hostnames_from(opts) + @host_index = 0 + @port = self.port_from(opts) @user = self.username_from(opts) @pass = self.password_from(opts) @vhost = self.vhost_from(opts) @logfile = opts[:log_file] || opts[:logfile] || STDOUT @@ -171,10 +177,12 @@ @channel_mutex = @mutex_impl.new # transport operations/continuations mutex. A workaround for # the non-reentrant Ruby mutexes. MK. @transport_mutex = @mutex_impl.new @status_mutex = @mutex_impl.new + @host_index_mutex = @mutex_impl.new + @channels = Hash.new @origin_thread = Thread.current self.reset_continuations @@ -208,10 +216,18 @@ # @return [Boolean] true if this connection uses a separate thread for I/O activity def threaded? @threaded end + def host + @transport ? @transport.host : @hosts[@host_index] + end + + def reset_host_index + @host_index_mutex.synchronize { @host_index = 0 } + end + # @private attr_reader :mutex_impl # Provides a way to fine tune the socket used by connection. # Accepts a block that the socket will be yielded to. @@ -230,40 +246,56 @@ # # @see http://rubybunny.info/articles/getting_started.html # @see http://rubybunny.info/articles/connecting.html # @api public def start + return self if connected? @status_mutex.synchronize { @status = :connecting } # reset here for cases when automatic network recovery kicks in # when we were blocked. MK. @blocked = false self.reset_continuations begin - # close existing transport if we have one, - # to not leak sockets - @transport.maybe_initialize_socket - @transport.post_initialize_socket - @transport.connect + begin - if @socket_configurator - @transport.configure_socket(&@socket_configurator) - end + # close existing transport if we have one, + # to not leak sockets + @transport.maybe_initialize_socket - self.init_connection - self.open_connection + @transport.post_initialize_socket + @transport.connect - @reader_loop = nil - self.start_reader_loop if threaded? + if @socket_configurator + @transport.configure_socket(&@socket_configurator) + end - @default_channel = self.create_channel unless @default_channel - rescue Exception => e + self.init_connection + self.open_connection + + @reader_loop = nil + self.start_reader_loop if threaded? + + rescue TCPConnectionFailed => e + self.initialize_transport + + @logger.warn e.message + @logger.warn "Retrying connection on next host in line: #{@transport.host}:#{@transport.port}" + + return self.start + rescue Exception + @status_mutex.synchronize { @status = :not_connected } + raise + end + + rescue HostListDepleted + self.reset_host_index @status_mutex.synchronize { @status = :not_connected } - raise e + raise TCPConnectionFailedForAllHosts end self end @@ -353,44 +385,10 @@ # @return [Boolean] true if this connection has automatic recovery from network failure enabled def automatically_recover? @automatically_recover end - # - # Backwards compatibility - # - - # @private - def queue(*args) - @default_channel.queue(*args) - end - - # @private - def direct(*args) - @default_channel.direct(*args) - end - - # @private - def fanout(*args) - @default_channel.fanout(*args) - end - - # @private - def topic(*args) - @default_channel.topic(*args) - end - - # @private - def headers(*args) - @default_channel.headers(*args) - end - - # @private - def exchange(*args) - @default_channel.exchange(*args) - end - # Defines a callback that will be executed when RabbitMQ blocks the connection # because it is running low on memory or disk space (as configured via config file # and/or rabbitmqctl). # # @yield [AMQ::Protocol::Connection::Blocked] connection.blocked method which provides a reason for blocking @@ -644,27 +642,32 @@ # @private def recover_from_network_failure begin sleep @network_recovery_interval @logger.debug "About to start connection recovery..." + + self.reset_host_index # since we are starting a fresh try. self.initialize_transport self.start if open? @recovering_from_network_failure = false recover_channels end - rescue TCPConnectionFailed, AMQ::Protocol::EmptyResponseError => e + rescue TCPConnectionFailedForAllHosts, TCPConnectionFailed, AMQ::Protocol::EmptyResponseError => e @logger.warn "TCP connection failed, reconnecting in #{@network_recovery_interval} seconds" sleep @network_recovery_interval retry if recoverable_network_failure?(e) end end # @private def recover_channels + # default channel is reopened right after connection + # negotiation is completed, so make sure we do not try to open + # it twice. MK. @channels.each do |n, ch| ch.open ch.recover_from_network_failure end @@ -724,12 +727,14 @@ close_transport end end # @private - def hostname_from(options) - options[:host] || options[:hostname] || DEFAULT_HOST + def hostnames_from(options) + options.fetch(:hosts_shuffle_strategy, @default_hosts_shuffle_strategy).call( + [ options[:hosts] || options[:host] || options[:hostname] || DEFAULT_HOST ].flatten + ) end # @private def port_from(options) fallback = if options[:tls] || options[:ssl] @@ -921,11 +926,11 @@ end # send_frameset_without_timeout(frames) # @return [String] # @api public def to_s - "#<#{self.class.name}:#{object_id} #{@user}@#{@host}:#{@port}, vhost=#{@vhost}>" + "#<#{self.class.name}:#{object_id} #{@user}@#{host}:#{@port}, vhost=#{@vhost}, hosts=[#{@hosts.join(',')}]>" end protected # @private @@ -1074,10 +1079,15 @@ @heartbeat_sender.stop if @heartbeat_sender end # @private def initialize_transport - @transport = Transport.new(self, @host, @port, @opts.merge(:session_thread => @origin_thread)) + if host = @hosts[ @host_index ] + @host_index_mutex.synchronize { @host_index += 1 } + @transport = Transport.new(self, host, @port, @opts.merge(:session_thread => @origin_thread)) + else + raise HostListDepleted + end end # @private def maybe_close_transport @transport.close if @transport