lib/bunny/session.rb in bunny-0.9.0.pre5 vs lib/bunny/session.rb in bunny-0.9.0.pre6

- old
+ new

@@ -109,17 +109,19 @@ @transport.uses_ssl? end alias ssl? uses_ssl? def start - @status = :connecting + @continuations = ::Queue.new + @status = :connecting self.initialize_transport self.init_connection self.open_connection + @event_loop = nil self.start_main_loop @default_channel = self.create_channel end @@ -231,13 +233,11 @@ end def close_connection(sync = true) @transport.send_frame(AMQ::Protocol::Connection::Close.encode(200, "Goodbye", 0, 0)) - if @heartbeat_sender - @heartbeat_sender.stop - end + maybe_shutdown_heartbeat_sender @status = :not_connected if sync @last_connection_close_ok = @continuations.pop end @@ -301,20 +301,80 @@ else @channels[ch_number].handle_frameset(*frames) end end + def handle_network_failure(exception) + if !recovering_from_network_failure? + @recovering_from_network_failure = true + if recoverable_network_failure?(exception) + # puts "Recovering from a network failure..." + @channels.each do |n, ch| + ch.maybe_kill_consumer_work_pool! + end + maybe_shutdown_heartbeat_sender + + recover_from_network_failure + else + # TODO: investigate if we can be a bit smarter here. MK. + end + end + end + + def recoverable_network_failure?(exception) + # TODO: investigate if we can be a bit smarter here. MK. + true + end + + def recovering_from_network_failure? + @recovering_from_network_failure + end + + def recover_from_network_failure + begin + # puts "About to start recovery..." + start + + if open? + @recovering_from_network_failure = false + + recover_channels + end + rescue TCPConnectionFailed, AMQ::Protocol::EmptyResponseError => e + # puts "TCP connection failed, reconnecting in 5 seconds" + sleep 5.0 + retry if recoverable_network_failure?(e) + end + end + + def recover_channels + # default channel is reopened right after connection + # negotiation is completed, so make sure we do not try to open + # it twice. MK. + @channels.reject { |n, ch| ch == @default_channel }.each do |n, ch| + ch.open + + ch.recover_from_network_failure + end + end + def send_raw(*args) @transport.write(*args) end def instantiate_connection_level_exception(frame) case frame when AMQ::Protocol::Connection::Close then klass = case frame.reply_code + when 503 then + InvalidCommand when 504 then ChannelError + when 504 then + UnexpectedFrame + else + raise "Unknown reply code: #{frame.reply_code}, text: #{frame.reply_text}" end klass.new("Connection-level error: #{frame.reply_text}", self, frame) end end @@ -486,16 +546,21 @@ [client_value, server_value].min end end def initialize_heartbeat_sender + # puts "Initializing heartbeat sender..." @heartbeat_sender = HeartbeatSender.new(@transport) @heartbeat_sender.start(@heartbeat) end + def maybe_shutdown_heartbeat_sender + @heartbeat_sender.stop if @heartbeat_sender + end + def initialize_transport - @transport = Transport.new(@host, @port, @opts) + @transport = Transport.new(self, @host, @port, @opts) end # Sends AMQ protocol header (also known as preamble). def send_preamble @transport.send_raw(AMQ::Protocol::PREAMBLE)