lib/bunny/session.rb in bunny-0.9.0.pre5 vs lib/bunny/session.rb in bunny-0.9.0.pre6
- old
+ new
@@ -109,17 +109,19 @@
@transport.uses_ssl?
end
alias ssl? uses_ssl?
def start
- @status = :connecting
+ @continuations = ::Queue.new
+ @status = :connecting
self.initialize_transport
self.init_connection
self.open_connection
+ @event_loop = nil
self.start_main_loop
@default_channel = self.create_channel
end
@@ -231,13 +233,11 @@
end
def close_connection(sync = true)
@transport.send_frame(AMQ::Protocol::Connection::Close.encode(200, "Goodbye", 0, 0))
- if @heartbeat_sender
- @heartbeat_sender.stop
- end
+ maybe_shutdown_heartbeat_sender
@status = :not_connected
if sync
@last_connection_close_ok = @continuations.pop
end
@@ -301,20 +301,80 @@
else
@channels[ch_number].handle_frameset(*frames)
end
end
+ def handle_network_failure(exception)
+ if !recovering_from_network_failure?
+ @recovering_from_network_failure = true
+ if recoverable_network_failure?(exception)
+ # puts "Recovering from a network failure..."
+ @channels.each do |n, ch|
+ ch.maybe_kill_consumer_work_pool!
+ end
+ maybe_shutdown_heartbeat_sender
+
+ recover_from_network_failure
+ else
+ # TODO: investigate if we can be a bit smarter here. MK.
+ end
+ end
+ end
+
+ def recoverable_network_failure?(exception)
+ # TODO: investigate if we can be a bit smarter here. MK.
+ true
+ end
+
+ def recovering_from_network_failure?
+ @recovering_from_network_failure
+ end
+
+ def recover_from_network_failure
+ begin
+ # puts "About to start recovery..."
+ start
+
+ if open?
+ @recovering_from_network_failure = false
+
+ recover_channels
+ end
+ rescue TCPConnectionFailed, AMQ::Protocol::EmptyResponseError => e
+ # puts "TCP connection failed, reconnecting in 5 seconds"
+ sleep 5.0
+ retry if recoverable_network_failure?(e)
+ end
+ end
+
+ def recover_channels
+ # default channel is reopened right after connection
+ # negotiation is completed, so make sure we do not try to open
+ # it twice. MK.
+ @channels.reject { |n, ch| ch == @default_channel }.each do |n, ch|
+ ch.open
+
+ ch.recover_from_network_failure
+ end
+ end
+
def send_raw(*args)
@transport.write(*args)
end
def instantiate_connection_level_exception(frame)
case frame
when AMQ::Protocol::Connection::Close then
klass = case frame.reply_code
+ when 503 then
+ InvalidCommand
when 504 then
ChannelError
+ when 504 then
+ UnexpectedFrame
+ else
+ raise "Unknown reply code: #{frame.reply_code}, text: #{frame.reply_text}"
end
klass.new("Connection-level error: #{frame.reply_text}", self, frame)
end
end
@@ -486,16 +546,21 @@
[client_value, server_value].min
end
end
def initialize_heartbeat_sender
+ # puts "Initializing heartbeat sender..."
@heartbeat_sender = HeartbeatSender.new(@transport)
@heartbeat_sender.start(@heartbeat)
end
+ def maybe_shutdown_heartbeat_sender
+ @heartbeat_sender.stop if @heartbeat_sender
+ end
+
def initialize_transport
- @transport = Transport.new(@host, @port, @opts)
+ @transport = Transport.new(self, @host, @port, @opts)
end
# Sends AMQ protocol header (also known as preamble).
def send_preamble
@transport.send_raw(AMQ::Protocol::PREAMBLE)