lib/bunny/session.rb in bunny-1.1.5 vs lib/bunny/session.rb in bunny-1.1.6

- old
+ new

@@ -144,10 +144,11 @@ true else opts[:automatically_recover] || opts[:automatic_recovery] end @network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL) + @recover_from_connection_close = opts.fetch(:recover_from_connection_close, false) # in ms @continuation_timeout = opts.fetch(:continuation_timeout, DEFAULT_CONTINUATION_TIMEOUT) @status = :not_connected @blocked = false @@ -217,10 +218,15 @@ raise ArgumentError, "No block provided!" if block.nil? @transport.configure_socket(&block) end + # @return [Integer] Client socket port + def local_port + @transport.local_address.ip_port + end + # Starts the connection process. # # @see http://rubybunny.info/articles/getting_started.html # @see http://rubybunny.info/articles/connecting.html # @api public @@ -517,25 +523,17 @@ when AMQ::Protocol::Channel::OpenOk then @continuations.push(method) when AMQ::Protocol::Channel::CloseOk then @continuations.push(method) when AMQ::Protocol::Connection::Close then - @last_connection_error = instantiate_connection_level_exception(method) - @continuations.push(method) - - begin - shut_down_all_consumer_work_pools! - maybe_shutdown_reader_loop - rescue ShutdownSignal => sse - # no-op - rescue Exception => e - @logger.warn "Caught an exception when cleaning up after receiving connection.close: #{e.message}" - ensure - close_transport + if recover_from_connection_close? + @logger.warn "Recovering from connection.close (#{method.reply_text})" + clean_up_on_shutdown + handle_network_failure(instantiate_connection_level_exception(method)) + else + clean_up_and_fail_on_connection_close!(method) end - - @origin_thread.raise(@last_connection_error) when AMQ::Protocol::Connection::CloseOk then @last_connection_close_ok = method begin @continuations.clear rescue StandardError => e @@ -589,27 +587,36 @@ @channels[ch_number].handle_frameset(*frames) end end # @private + def recover_from_connection_close? + @recover_from_connection_close + end + + # @private def handle_network_failure(exception) raise NetworkErrorWrapper.new(exception) unless @threaded @status = :disconnected if !recovering_from_network_failure? - @recovering_from_network_failure = true - if recoverable_network_failure?(exception) - @logger.warn "Recovering from a network failure..." - @channels.each do |n, ch| - ch.maybe_kill_consumer_work_pool! - end - maybe_shutdown_heartbeat_sender + begin + @recovering_from_network_failure = true + if recoverable_network_failure?(exception) + @logger.warn "Recovering from a network failure..." + @channels.each do |n, ch| + ch.maybe_kill_consumer_work_pool! + end + maybe_shutdown_heartbeat_sender - recover_from_network_failure - else - # TODO: investigate if we can be a bit smarter here. MK. + recover_from_network_failure + else + # TODO: investigate if we can be a bit smarter here. MK. + end + ensure + @recovering_from_network_failure = false end end end # @private @@ -679,9 +686,30 @@ else raise "Unknown reply code: #{frame.reply_code}, text: #{frame.reply_text}" end klass.new("Connection-level error: #{frame.reply_text}", self, frame) + end + end + + def clean_up_and_fail_on_connection_close!(method) + @last_connection_error = instantiate_connection_level_exception(method) + @continuations.push(method) + + clean_up_on_shutdown + @origin_thread.raise(@last_connection_error) + end + + def clean_up_on_shutdown + begin + shut_down_all_consumer_work_pools! + maybe_shutdown_reader_loop + rescue ShutdownSignal => sse + # no-op + rescue Exception => e + @logger.warn "Caught an exception when cleaning up after receiving connection.close: #{e.message}" + ensure + close_transport end end # @private def hostname_from(options)