lib/bunny/session.rb in bunny-1.0.0.pre1 vs lib/bunny/session.rb in bunny-1.0.0.pre2
- old
+ new
@@ -1,7 +1,8 @@
require "socket"
require "thread"
+require "monitor"
require "bunny/transport"
require "bunny/channel_id_allocator"
require "bunny/heartbeat_sender"
require "bunny/reader_loop"
@@ -142,15 +143,18 @@
@client_properties = opts[:properties] || DEFAULT_CLIENT_PROPERTIES
@mechanism = opts.fetch(:auth_mechanism, "PLAIN")
@credentials_encoder = credentials_encoder_for(@mechanism)
@locale = @opts.fetch(:locale, DEFAULT_LOCALE)
+
+ @mutex_impl = @opts.fetch(:mutex_impl, Monitor)
+
# mutex for the channel id => channel hash
- @channel_mutex = Mutex.new
+ @channel_mutex = @mutex_impl.new
# transport operations/continuations mutex. A workaround for
# the non-reentrant Ruby mutexes. MK.
- @transport_mutex = Mutex.new
+ @transport_mutex = @mutex_impl.new
@channels = Hash.new
@origin_thread = Thread.current
self.reset_continuations
@@ -184,10 +188,13 @@
# @return [Boolean] true if this connection uses a separate thread for I/O activity
def threaded?
@threaded
end
+ # @private
+ attr_reader :mutex_impl
+
def configure_socket(&block)
raise ArgumentError, "No block provided!" if block.nil?
@transport.configure_socket(&block)
end
@@ -257,12 +264,17 @@
def close
if @transport.open?
close_all_channels
Bunny::Timer.timeout(@transport.disconnect_timeout, ClientTimeout) do
- self.close_connection(false)
+ self.close_connection(true)
end
+
+ maybe_shutdown_reader_loop
+ close_transport
+
+ @status = :closed
end
end
alias stop close
# Creates a temporary channel, yields it to the block given to this
@@ -368,17 +380,19 @@
end
end
# @private
def close_connection(sync = true)
- @transport.send_frame(AMQ::Protocol::Connection::Close.encode(200, "Goodbye", 0, 0))
+ if @transport.open?
+ @transport.send_frame(AMQ::Protocol::Connection::Close.encode(200, "Goodbye", 0, 0))
- maybe_shutdown_heartbeat_sender
- @status = :not_connected
+ maybe_shutdown_heartbeat_sender
+ @status = :not_connected
- if sync
- @last_connection_close_ok = wait_on_continuations
+ if sync
+ @last_connection_close_ok = wait_on_continuations
+ end
end
end
# @private
def handle_frame(ch_number, method)
@@ -390,20 +404,15 @@
@continuations.push(method)
when AMQ::Protocol::Connection::Close then
@last_connection_error = instantiate_connection_level_exception(method)
@continuations.push(method)
- raise @last_connection_error
+ @origin_thread.raise(@last_connection_error)
when AMQ::Protocol::Connection::CloseOk then
@last_connection_close_ok = method
begin
@continuations.clear
-
- reader_loop.stop
- @reader_loop = nil
-
- @transport.close
rescue StandardError => e
@logger.error e.class.name
@logger.error e.message
@logger.error e.backtrace
ensure
@@ -613,10 +622,30 @@
@reader_loop ||= ReaderLoop.new(@transport, self, Thread.current)
end
# @private
def maybe_shutdown_reader_loop
- @reader_loop.stop if @reader_loop
+ if @reader_loop
+ @reader_loop.stop
+ # We don't need to kill the loop but
+ # this is the easiest way to wait until the loop
+ # is guaranteed to have terminated
+ @reader_loop.kill
+ end
+
+ @reader_loop = nil
+ end
+
+ # @private
+ def close_transport
+ begin
+ @transport.close
+ rescue StandardError => e
+ @logger.error "Exception when closing transport:"
+ @logger.error e.class.name
+ @logger.error e.message
+ @logger.error e.backtrace
+ end
end
# @private
def signal_activity!
@heartbeat_sender.signal_activity! if @heartbeat_sender