lib/bunny/session.rb in bunny-1.0.0.pre1 vs lib/bunny/session.rb in bunny-1.0.0.pre2

- old
+ new

@@ -1,7 +1,8 @@ require "socket" require "thread" +require "monitor" require "bunny/transport" require "bunny/channel_id_allocator" require "bunny/heartbeat_sender" require "bunny/reader_loop" @@ -142,15 +143,18 @@ @client_properties = opts[:properties] || DEFAULT_CLIENT_PROPERTIES @mechanism = opts.fetch(:auth_mechanism, "PLAIN") @credentials_encoder = credentials_encoder_for(@mechanism) @locale = @opts.fetch(:locale, DEFAULT_LOCALE) + + @mutex_impl = @opts.fetch(:mutex_impl, Monitor) + # mutex for the channel id => channel hash - @channel_mutex = Mutex.new + @channel_mutex = @mutex_impl.new # transport operations/continuations mutex. A workaround for # the non-reentrant Ruby mutexes. MK. - @transport_mutex = Mutex.new + @transport_mutex = @mutex_impl.new @channels = Hash.new @origin_thread = Thread.current self.reset_continuations @@ -184,10 +188,13 @@ # @return [Boolean] true if this connection uses a separate thread for I/O activity def threaded? @threaded end + # @private + attr_reader :mutex_impl + def configure_socket(&block) raise ArgumentError, "No block provided!" if block.nil? @transport.configure_socket(&block) end @@ -257,12 +264,17 @@ def close if @transport.open? close_all_channels Bunny::Timer.timeout(@transport.disconnect_timeout, ClientTimeout) do - self.close_connection(false) + self.close_connection(true) end + + maybe_shutdown_reader_loop + close_transport + + @status = :closed end end alias stop close # Creates a temporary channel, yields it to the block given to this @@ -368,17 +380,19 @@ end end # @private def close_connection(sync = true) - @transport.send_frame(AMQ::Protocol::Connection::Close.encode(200, "Goodbye", 0, 0)) + if @transport.open? + @transport.send_frame(AMQ::Protocol::Connection::Close.encode(200, "Goodbye", 0, 0)) - maybe_shutdown_heartbeat_sender - @status = :not_connected + maybe_shutdown_heartbeat_sender + @status = :not_connected - if sync - @last_connection_close_ok = wait_on_continuations + if sync + @last_connection_close_ok = wait_on_continuations + end end end # @private def handle_frame(ch_number, method) @@ -390,20 +404,15 @@ @continuations.push(method) when AMQ::Protocol::Connection::Close then @last_connection_error = instantiate_connection_level_exception(method) @continuations.push(method) - raise @last_connection_error + @origin_thread.raise(@last_connection_error) when AMQ::Protocol::Connection::CloseOk then @last_connection_close_ok = method begin @continuations.clear - - reader_loop.stop - @reader_loop = nil - - @transport.close rescue StandardError => e @logger.error e.class.name @logger.error e.message @logger.error e.backtrace ensure @@ -613,10 +622,30 @@ @reader_loop ||= ReaderLoop.new(@transport, self, Thread.current) end # @private def maybe_shutdown_reader_loop - @reader_loop.stop if @reader_loop + if @reader_loop + @reader_loop.stop + # We don't need to kill the loop but + # this is the easiest way to wait until the loop + # is guaranteed to have terminated + @reader_loop.kill + end + + @reader_loop = nil + end + + # @private + def close_transport + begin + @transport.close + rescue StandardError => e + @logger.error "Exception when closing transport:" + @logger.error e.class.name + @logger.error e.message + @logger.error e.backtrace + end end # @private def signal_activity! @heartbeat_sender.signal_activity! if @heartbeat_sender