lib/bunny/session.rb in bunny-1.1.5 vs lib/bunny/session.rb in bunny-1.1.6
- old
+ new
@@ -144,10 +144,11 @@
true
else
opts[:automatically_recover] || opts[:automatic_recovery]
end
@network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL)
+ @recover_from_connection_close = opts.fetch(:recover_from_connection_close, false)
# in ms
@continuation_timeout = opts.fetch(:continuation_timeout, DEFAULT_CONTINUATION_TIMEOUT)
@status = :not_connected
@blocked = false
@@ -217,10 +218,15 @@
raise ArgumentError, "No block provided!" if block.nil?
@transport.configure_socket(&block)
end
+ # @return [Integer] Client socket port
+ def local_port
+ @transport.local_address.ip_port
+ end
+
# Starts the connection process.
#
# @see http://rubybunny.info/articles/getting_started.html
# @see http://rubybunny.info/articles/connecting.html
# @api public
@@ -517,25 +523,17 @@
when AMQ::Protocol::Channel::OpenOk then
@continuations.push(method)
when AMQ::Protocol::Channel::CloseOk then
@continuations.push(method)
when AMQ::Protocol::Connection::Close then
- @last_connection_error = instantiate_connection_level_exception(method)
- @continuations.push(method)
-
- begin
- shut_down_all_consumer_work_pools!
- maybe_shutdown_reader_loop
- rescue ShutdownSignal => sse
- # no-op
- rescue Exception => e
- @logger.warn "Caught an exception when cleaning up after receiving connection.close: #{e.message}"
- ensure
- close_transport
+ if recover_from_connection_close?
+ @logger.warn "Recovering from connection.close (#{method.reply_text})"
+ clean_up_on_shutdown
+ handle_network_failure(instantiate_connection_level_exception(method))
+ else
+ clean_up_and_fail_on_connection_close!(method)
end
-
- @origin_thread.raise(@last_connection_error)
when AMQ::Protocol::Connection::CloseOk then
@last_connection_close_ok = method
begin
@continuations.clear
rescue StandardError => e
@@ -589,27 +587,36 @@
@channels[ch_number].handle_frameset(*frames)
end
end
# @private
+ def recover_from_connection_close?
+ @recover_from_connection_close
+ end
+
+ # @private
def handle_network_failure(exception)
raise NetworkErrorWrapper.new(exception) unless @threaded
@status = :disconnected
if !recovering_from_network_failure?
- @recovering_from_network_failure = true
- if recoverable_network_failure?(exception)
- @logger.warn "Recovering from a network failure..."
- @channels.each do |n, ch|
- ch.maybe_kill_consumer_work_pool!
- end
- maybe_shutdown_heartbeat_sender
+ begin
+ @recovering_from_network_failure = true
+ if recoverable_network_failure?(exception)
+ @logger.warn "Recovering from a network failure..."
+ @channels.each do |n, ch|
+ ch.maybe_kill_consumer_work_pool!
+ end
+ maybe_shutdown_heartbeat_sender
- recover_from_network_failure
- else
- # TODO: investigate if we can be a bit smarter here. MK.
+ recover_from_network_failure
+ else
+ # TODO: investigate if we can be a bit smarter here. MK.
+ end
+ ensure
+ @recovering_from_network_failure = false
end
end
end
# @private
@@ -679,9 +686,30 @@
else
raise "Unknown reply code: #{frame.reply_code}, text: #{frame.reply_text}"
end
klass.new("Connection-level error: #{frame.reply_text}", self, frame)
+ end
+ end
+
+ def clean_up_and_fail_on_connection_close!(method)
+ @last_connection_error = instantiate_connection_level_exception(method)
+ @continuations.push(method)
+
+ clean_up_on_shutdown
+ @origin_thread.raise(@last_connection_error)
+ end
+
+ def clean_up_on_shutdown
+ begin
+ shut_down_all_consumer_work_pools!
+ maybe_shutdown_reader_loop
+ rescue ShutdownSignal => sse
+ # no-op
+ rescue Exception => e
+ @logger.warn "Caught an exception when cleaning up after receiving connection.close: #{e.message}"
+ ensure
+ close_transport
end
end
# @private
def hostname_from(options)