lib/bunny/session.rb in bunny-1.2.1 vs lib/bunny/session.rb in bunny-1.2.2
- old
+ new
@@ -170,10 +170,11 @@
# mutex for the channel id => channel hash
@channel_mutex = @mutex_impl.new
# transport operations/continuations mutex. A workaround for
# the non-reentrant Ruby mutexes. MK.
@transport_mutex = @mutex_impl.new
+ @status_mutex = @mutex_impl.new
@channels = Hash.new
@origin_thread = Thread.current
self.reset_continuations
@@ -231,11 +232,11 @@
# @see http://rubybunny.info/articles/connecting.html
# @api public
def start
return self if connected?
- @status = :connecting
+ @status_mutex.synchronize { @status = :connecting }
# reset here for cases when automatic network recovery kicks in
# when we were blocked. MK.
@blocked = false
self.reset_continuations
@@ -257,11 +258,11 @@
@reader_loop = nil
self.start_reader_loop if threaded?
@default_channel = self.create_channel
rescue Exception => e
- @status = :not_connected
+ @status_mutex.synchronize { @status = :not_connected }
raise e
end
self
end
@@ -293,20 +294,22 @@
end
alias channel create_channel
# Closes the connection. This involves closing all of its channels.
def close
- if @transport.open?
- close_all_channels
+ @status_mutex.synchronize { @status = :closing }
- Bunny::Timeout.timeout(@transport.disconnect_timeout, ClientTimeout) do
+ ignoring_io_errors do
+ if @transport.open?
+ close_all_channels
+
self.close_connection(true)
end
- end
- clean_up_on_shutdown
- @status = :closed
+ clean_up_on_shutdown
+ end
+ @status_mutex.synchronize { @status = :closed }
end
alias stop close
# Creates a temporary channel, yields it to the block given to this
# method and closes it.
@@ -326,18 +329,26 @@
# @return [Boolean] true if this connection is still not fully open
def connecting?
status == :connecting
end
+ # @return [Boolean] true if this AMQP 0.9.1 connection is closing
+ # @api private
+ def closing?
+ @status_mutex.synchronize { @status == :closing }
+ end
+
# @return [Boolean] true if this AMQP 0.9.1 connection is closed
def closed?
- status == :closed
+ @status_mutex.synchronize { @status == :closed }
end
# @return [Boolean] true if this AMQP 0.9.1 connection is open
def open?
- (status == :open || status == :connected || status == :connecting) && @transport.open?
+ @status_mutex.synchronize do
+ (status == :open || status == :connected || status == :connecting) && @transport.open?
+ end
end
alias connected? open?
# @return [Boolean] true if this connection has automatic recovery from network failure enabled
def automatically_recover?
@@ -504,11 +515,11 @@
end
end
shut_down_all_consumer_work_pools!
maybe_shutdown_heartbeat_sender
- @status = :not_connected
+ @status_mutex.synchronize { @status = :not_connected }
end
# Handles incoming frames and dispatches them.
#
# Channel methods (`channel.open-ok`, `channel.close-ok`) are
@@ -595,11 +606,11 @@
# @private
def handle_network_failure(exception)
raise NetworkErrorWrapper.new(exception) unless @threaded
- @status = :disconnected
+ @status_mutex.synchronize { @status = :disconnected }
if !recovering_from_network_failure?
begin
@recovering_from_network_failure = true
if recoverable_network_failure?(exception)
@@ -694,11 +705,11 @@
def clean_up_and_fail_on_connection_close!(method)
@last_connection_error = instantiate_connection_level_exception(method)
@continuations.push(method)
clean_up_on_shutdown
- @origin_thread.raise(@last_connection_error)
+ @origin_thread.terminate_with(@last_connection_error)
end
def clean_up_on_shutdown
begin
shut_down_all_consumer_work_pools!
@@ -791,11 +802,11 @@
if @reader_loop
@reader_loop.stop
if threaded?
# this is the easiest way to wait until the loop
# is guaranteed to have terminated
- @reader_loop.raise(ShutdownSignal)
+ @reader_loop.terminate_with(ShutdownSignal)
# joining the thread here may take forever
# on JRuby because sun.nio.ch.KQueueArrayWrapper#kevent0 is
# a native method that cannot be (easily) interrupted.
# So we use this ugly hack or else our test suite takes forever
# to run on JRuby (a new connection is opened/closed per example). MK.
@@ -926,11 +937,11 @@
@server_capabilities = @server_properties["capabilities"]
@server_authentication_mechanisms = (connection_start.mechanisms || "").split(" ")
@server_locales = Array(connection_start.locales)
- @status = :connected
+ @status_mutex.synchronize { @status = :connected }
end
# @private
def open_connection
@transport.send_frame(AMQ::Protocol::Connection::StartOk.encode(@client_properties, @mechanism, self.encode_credentials(username, password), @locale))
@@ -1002,11 +1013,11 @@
@logger.warn "RabbitMQ closed TCP connection before AMQP 0.9.1 connection was finalized. Most likely this means authentication failure."
raise Bunny::PossibleAuthenticationFailureError.new(self.user, self.vhost, self.password.size)
end
connection_open_ok = frame2.decode_payload
- @status = :open
+ @status_mutex.synchronize { @status = :open }
if @heartbeat && @heartbeat > 0
initialize_heartbeat_sender
end
unless connection_open_ok.is_a?(AMQ::Protocol::Connection::OpenOk)
@@ -1021,11 +1032,11 @@
@logger.warn "Caught an exception when cleaning up after receiving connection.close: #{e.message}"
ensure
close_transport
end
- @origin_thread.raise(e)
+ @origin_thread.terminate_with(e)
else
raise "could not open connection: server did not respond with connection.open-ok but #{connection_open_ok.inspect} instead"
end
end
end
@@ -1142,9 +1153,17 @@
case n
when 0 then
CHANNEL_MAX_LIMIT
else
n
+ end
+ end
+
+ def ignoring_io_errors(&block)
+ begin
+ block.call
+ rescue AMQ::Protocol::EmptyResponseError, IOError, SystemCallError, Bunny::NetworkFailure => _
+ # ignore
end
end
end # Session
# backwards compatibility