lib/bunny/session.rb in bunny-2.0.0.rc1 vs lib/bunny/session.rb in bunny-2.0.0.rc2
- old
+ new
@@ -44,15 +44,11 @@
# backwards compatibility
# @private
CONNECT_TIMEOUT = Transport::DEFAULT_CONNECTION_TIMEOUT
# @private
- DEFAULT_CONTINUATION_TIMEOUT = if RUBY_VERSION.to_f < 1.9
- 8000
- else
- 4000
- end
+ DEFAULT_CONTINUATION_TIMEOUT = 15000
# RabbitMQ client metadata
DEFAULT_CLIENT_PROPERTIES = {
:capabilities => {
:publisher_confirms => true,
@@ -115,10 +111,12 @@
# @option connection_string_or_opts [Proc] :hosts_shuffle_strategy A Proc that reorders a list of host strings, defaults to Array#shuffle
# @option connection_string_or_opts [Logger] :logger The logger. If missing, one is created using :log_file and :log_level.
# @option connection_string_or_opts [IO, String] :log_file The file or path to use when creating a logger. Defaults to STDOUT.
# @option connection_string_or_opts [IO, String] :logfile DEPRECATED: use :log_file instead. The file or path to use when creating a logger. Defaults to STDOUT.
# @option connection_string_or_opts [Integer] :log_level The log level to use when creating a logger. Defaults to LOGGER::WARN
+ # @option connection_string_or_opts [Boolean] :automatically_recover Should automatically recover from network failures?
+ # @option connection_string_or_opts [Integer] :recovery_attempts Max number of recovery attempts
#
# @option optz [String] :auth_mechanism ("PLAIN") Authentication mechanism, PLAIN or EXTERNAL
# @option optz [String] :locale ("PLAIN") Locale RabbitMQ should use
#
# @see http://rubybunny.info/articles/connecting.html Connecting to RabbitMQ guide
@@ -154,10 +152,11 @@
@automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil?
true
else
opts[:automatically_recover] || opts[:automatic_recovery]
end
+ @recovery_attempts = opts[:recovery_attempts]
@network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL)
@recover_from_connection_close = opts.fetch(:recover_from_connection_close, false)
# in ms
@continuation_timeout = opts.fetch(:continuation_timeout, DEFAULT_CONTINUATION_TIMEOUT)
@@ -663,15 +662,23 @@
reset_host_index
retry
rescue TCPConnectionFailedForAllHosts, TCPConnectionFailed, AMQ::Protocol::EmptyResponseError => e
@logger.warn "TCP connection failed, reconnecting in #{@network_recovery_interval} seconds"
sleep @network_recovery_interval
- retry if recoverable_network_failure?(e)
+ if should_retry_recovery?
+ @recovery_attempts -= 1 if @recovery_attempts
+ retry if recoverable_network_failure?(e)
+ end
end
end
# @private
+ def should_retry_recovery?
+ @recovery_attempts.nil? || @recovery_attempts > 1
+ end
+
+ # @private
def recover_channels
# default channel is reopened right after connection
# negotiation is completed, so make sure we do not try to open
# it twice. MK.
@channels.each do |n, ch|
@@ -896,10 +903,12 @@
# threads publish on the same channel aggressively, at some point frames will be
# delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception.
# If we synchronize on the channel, however, this is both thread safe and pretty fine-grained
# locking. Note that "single frame" methods do not need this kind of synchronization. MK.
channel.synchronize do
- frames.each { |frame| self.send_frame(frame, false) }
+ # see rabbitmq/rabbitmq-server#156
+ data = frames.reduce("") { |acc, frame| acc << frame.encode }
+ @transport.write(data)
signal_activity!
end
end # send_frameset(frames)
# Sends multiple frames, one by one. For thread safety this method takes a channel