lib/bunny/session.rb in bunny-1.2.1 vs lib/bunny/session.rb in bunny-1.2.2

- old
+ new

@@ -170,10 +170,11 @@ # mutex for the channel id => channel hash @channel_mutex = @mutex_impl.new # transport operations/continuations mutex. A workaround for # the non-reentrant Ruby mutexes. MK. @transport_mutex = @mutex_impl.new + @status_mutex = @mutex_impl.new @channels = Hash.new @origin_thread = Thread.current self.reset_continuations @@ -231,11 +232,11 @@ # @see http://rubybunny.info/articles/connecting.html # @api public def start return self if connected? - @status = :connecting + @status_mutex.synchronize { @status = :connecting } # reset here for cases when automatic network recovery kicks in # when we were blocked. MK. @blocked = false self.reset_continuations @@ -257,11 +258,11 @@ @reader_loop = nil self.start_reader_loop if threaded? @default_channel = self.create_channel rescue Exception => e - @status = :not_connected + @status_mutex.synchronize { @status = :not_connected } raise e end self end @@ -293,20 +294,22 @@ end alias channel create_channel # Closes the connection. This involves closing all of its channels. def close - if @transport.open? - close_all_channels + @status_mutex.synchronize { @status = :closing } - Bunny::Timeout.timeout(@transport.disconnect_timeout, ClientTimeout) do + ignoring_io_errors do + if @transport.open? + close_all_channels + self.close_connection(true) end - end - clean_up_on_shutdown - @status = :closed + clean_up_on_shutdown + end + @status_mutex.synchronize { @status = :closed } end alias stop close # Creates a temporary channel, yields it to the block given to this # method and closes it. @@ -326,18 +329,26 @@ # @return [Boolean] true if this connection is still not fully open def connecting? status == :connecting end + # @return [Boolean] true if this AMQP 0.9.1 connection is closing + # @api private + def closing? + @status_mutex.synchronize { @status == :closing } + end + # @return [Boolean] true if this AMQP 0.9.1 connection is closed def closed? - status == :closed + @status_mutex.synchronize { @status == :closed } end # @return [Boolean] true if this AMQP 0.9.1 connection is open def open? - (status == :open || status == :connected || status == :connecting) && @transport.open? + @status_mutex.synchronize do + (status == :open || status == :connected || status == :connecting) && @transport.open? + end end alias connected? open? # @return [Boolean] true if this connection has automatic recovery from network failure enabled def automatically_recover? @@ -504,11 +515,11 @@ end end shut_down_all_consumer_work_pools! maybe_shutdown_heartbeat_sender - @status = :not_connected + @status_mutex.synchronize { @status = :not_connected } end # Handles incoming frames and dispatches them. # # Channel methods (`channel.open-ok`, `channel.close-ok`) are @@ -595,11 +606,11 @@ # @private def handle_network_failure(exception) raise NetworkErrorWrapper.new(exception) unless @threaded - @status = :disconnected + @status_mutex.synchronize { @status = :disconnected } if !recovering_from_network_failure? begin @recovering_from_network_failure = true if recoverable_network_failure?(exception) @@ -694,11 +705,11 @@ def clean_up_and_fail_on_connection_close!(method) @last_connection_error = instantiate_connection_level_exception(method) @continuations.push(method) clean_up_on_shutdown - @origin_thread.raise(@last_connection_error) + @origin_thread.terminate_with(@last_connection_error) end def clean_up_on_shutdown begin shut_down_all_consumer_work_pools! @@ -791,11 +802,11 @@ if @reader_loop @reader_loop.stop if threaded? # this is the easiest way to wait until the loop # is guaranteed to have terminated - @reader_loop.raise(ShutdownSignal) + @reader_loop.terminate_with(ShutdownSignal) # joining the thread here may take forever # on JRuby because sun.nio.ch.KQueueArrayWrapper#kevent0 is # a native method that cannot be (easily) interrupted. # So we use this ugly hack or else our test suite takes forever # to run on JRuby (a new connection is opened/closed per example). MK. @@ -926,11 +937,11 @@ @server_capabilities = @server_properties["capabilities"] @server_authentication_mechanisms = (connection_start.mechanisms || "").split(" ") @server_locales = Array(connection_start.locales) - @status = :connected + @status_mutex.synchronize { @status = :connected } end # @private def open_connection @transport.send_frame(AMQ::Protocol::Connection::StartOk.encode(@client_properties, @mechanism, self.encode_credentials(username, password), @locale)) @@ -1002,11 +1013,11 @@ @logger.warn "RabbitMQ closed TCP connection before AMQP 0.9.1 connection was finalized. Most likely this means authentication failure." raise Bunny::PossibleAuthenticationFailureError.new(self.user, self.vhost, self.password.size) end connection_open_ok = frame2.decode_payload - @status = :open + @status_mutex.synchronize { @status = :open } if @heartbeat && @heartbeat > 0 initialize_heartbeat_sender end unless connection_open_ok.is_a?(AMQ::Protocol::Connection::OpenOk) @@ -1021,11 +1032,11 @@ @logger.warn "Caught an exception when cleaning up after receiving connection.close: #{e.message}" ensure close_transport end - @origin_thread.raise(e) + @origin_thread.terminate_with(e) else raise "could not open connection: server did not respond with connection.open-ok but #{connection_open_ok.inspect} instead" end end end @@ -1142,9 +1153,17 @@ case n when 0 then CHANNEL_MAX_LIMIT else n + end + end + + def ignoring_io_errors(&block) + begin + block.call + rescue AMQ::Protocol::EmptyResponseError, IOError, SystemCallError, Bunny::NetworkFailure => _ + # ignore end end end # Session # backwards compatibility