lib/bunny/session.rb in bunny-2.0.0.rc1 vs lib/bunny/session.rb in bunny-2.0.0.rc2

- old
+ new

@@ -44,15 +44,11 @@ # backwards compatibility # @private CONNECT_TIMEOUT = Transport::DEFAULT_CONNECTION_TIMEOUT # @private - DEFAULT_CONTINUATION_TIMEOUT = if RUBY_VERSION.to_f < 1.9 - 8000 - else - 4000 - end + DEFAULT_CONTINUATION_TIMEOUT = 15000 # RabbitMQ client metadata DEFAULT_CLIENT_PROPERTIES = { :capabilities => { :publisher_confirms => true, @@ -115,10 +111,12 @@ # @option connection_string_or_opts [Proc] :hosts_shuffle_strategy A Proc that reorders a list of host strings, defaults to Array#shuffle # @option connection_string_or_opts [Logger] :logger The logger. If missing, one is created using :log_file and :log_level. # @option connection_string_or_opts [IO, String] :log_file The file or path to use when creating a logger. Defaults to STDOUT. # @option connection_string_or_opts [IO, String] :logfile DEPRECATED: use :log_file instead. The file or path to use when creating a logger. Defaults to STDOUT. # @option connection_string_or_opts [Integer] :log_level The log level to use when creating a logger. Defaults to LOGGER::WARN + # @option connection_string_or_opts [Boolean] :automatically_recover Should automatically recover from network failures? + # @option connection_string_or_opts [Integer] :recovery_attempts Max number of recovery attempts # # @option optz [String] :auth_mechanism ("PLAIN") Authentication mechanism, PLAIN or EXTERNAL # @option optz [String] :locale ("PLAIN") Locale RabbitMQ should use # # @see http://rubybunny.info/articles/connecting.html Connecting to RabbitMQ guide @@ -154,10 +152,11 @@ @automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil? true else opts[:automatically_recover] || opts[:automatic_recovery] end + @recovery_attempts = opts[:recovery_attempts] @network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL) @recover_from_connection_close = opts.fetch(:recover_from_connection_close, false) # in ms @continuation_timeout = opts.fetch(:continuation_timeout, DEFAULT_CONTINUATION_TIMEOUT) @@ -663,15 +662,23 @@ reset_host_index retry rescue TCPConnectionFailedForAllHosts, TCPConnectionFailed, AMQ::Protocol::EmptyResponseError => e @logger.warn "TCP connection failed, reconnecting in #{@network_recovery_interval} seconds" sleep @network_recovery_interval - retry if recoverable_network_failure?(e) + if should_retry_recovery? + @recovery_attempts -= 1 if @recovery_attempts + retry if recoverable_network_failure?(e) + end end end # @private + def should_retry_recovery? + @recovery_attempts.nil? || @recovery_attempts > 1 + end + + # @private def recover_channels # default channel is reopened right after connection # negotiation is completed, so make sure we do not try to open # it twice. MK. @channels.each do |n, ch| @@ -896,10 +903,12 @@ # threads publish on the same channel aggressively, at some point frames will be # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception. # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained # locking. Note that "single frame" methods do not need this kind of synchronization. MK. channel.synchronize do - frames.each { |frame| self.send_frame(frame, false) } + # see rabbitmq/rabbitmq-server#156 + data = frames.reduce("") { |acc, frame| acc << frame.encode } + @transport.write(data) signal_activity! end end # send_frameset(frames) # Sends multiple frames, one by one. For thread safety this method takes a channel