lib/bunny/session.rb in bunny-0.9.0.pre7 vs lib/bunny/session.rb in bunny-0.9.0.pre8
- old
+ new
@@ -99,10 +99,17 @@
@vhost = self.vhost_from(opts)
@logfile = opts[:logfile]
@logging = opts[:logging] || false
@threaded = opts.fetch(:threaded, true)
+ # should automatic recovery from network failures be used?
+ @automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil?
+ true
+ else
+ opts[:automatically_recover] || opts[:automatic_recovery]
+ end
+
@status = :not_connected
# these are negotiated with the broker during the connection tuning phase
@client_frame_max = opts.fetch(:frame_max, DEFAULT_FRAME_MAX)
@client_channel_max = opts.fetch(:channel_max, 65536)
@@ -110,11 +117,13 @@
@client_properties = opts[:properties] || DEFAULT_CLIENT_PROPERTIES
@mechanism = opts.fetch(:auth_mechanism, "PLAIN")
@credentials_encoder = credentials_encoder_for(@mechanism)
@locale = @opts.fetch(:locale, DEFAULT_LOCALE)
+ # mutex for the channel id => channel hash
@channel_mutex = Mutex.new
+ @network_mutex = Mutex.new
@channels = Hash.new
@continuations = ::Queue.new
end
@@ -208,10 +217,14 @@
def open?
(status == :open || status == :connected) && @transport.open?
end
alias connected? open?
+ def automatically_recover?
+ @automatically_recover
+ end
+
#
# Backwards compatibility
#
# @private
@@ -252,11 +265,13 @@
# @private
def open_channel(ch)
n = ch.number
self.register_channel(ch)
- @transport.send_frame(AMQ::Protocol::Channel::Open.encode(n, AMQ::Protocol::EMPTY_STRING))
+ @channel_mutex.synchronize do
+ @transport.send_frame(AMQ::Protocol::Channel::Open.encode(n, AMQ::Protocol::EMPTY_STRING))
+ end
@last_channel_open_ok = wait_on_continuations
raise_if_continuation_resulted_in_a_connection_error!
@last_channel_open_ok
end
@@ -315,12 +330,11 @@
rescue StandardError => e
puts e.class.name
puts e.message
puts e.backtrace
ensure
- @active_continuation.notify_all if @active_continuation
- @active_continuation = false
+ @continuations.push(nil)
end
when AMQ::Protocol::Channel::Close then
begin
ch = @channels[ch_number]
ch.handle_method(method)
@@ -418,12 +432,12 @@
ch.recover_from_network_failure
end
end
# @private
- def send_raw(*args)
- @transport.write(*args)
+ def send_raw(data)
+ @transport.write(data)
end
# @private
def instantiate_connection_level_exception(frame)
case frame
@@ -511,11 +525,11 @@
event_loop.start
end
# @private
def event_loop
- @event_loop ||= MainLoop.new(@transport, self)
+ @event_loop ||= MainLoop.new(@transport, self, Thread.current)
end
# @private
def signal_activity!
@heartbeat_sender.signal_activity! if @heartbeat_sender
@@ -529,14 +543,28 @@
# @private
def send_frame(frame)
if closed?
raise ConnectionClosedError.new(frame)
else
- @transport.send_raw(frame.encode)
+ @network_mutex.synchronize { @transport.write(frame.encode) }
end
end
+ # Sends frame to the peer, checking that connection is open.
+ # Uses transport implementation that does not perform
+ # timeout control. Exposed primarily for Bunny::Channel.
+ #
+ # @raise [ConnectionClosedError]
+ # @private
+ def send_frame_without_timeout(frame)
+ if closed?
+ raise ConnectionClosedError.new(frame)
+ else
+ @network_mutex.synchronize { @transport.write_without_timeout(frame.encode) }
+ end
+ end
+
# Sends multiple frames, one by one. For thread safety this method takes a channel
# object and synchronizes on it.
#
# @api private
def send_frameset(frames, channel)
@@ -544,15 +572,31 @@
# threads publish on the same channel aggressively, at some point frames will be
# delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception.
# If we synchronize on the channel, however, this is both thread safe and pretty fine-grained
# locking. Note that "single frame" methods do not need this kind of synchronization. MK.
channel.synchronize do
- frames.each { |frame| @transport.send_frame(frame) }
+ frames.each { |frame| self.send_frame(frame) }
@transport.flush
end
end # send_frameset(frames)
+ # Sends multiple frames, one by one. For thread safety this method takes a channel
+ # object and synchronizes on it. Uses transport implementation that does not perform
+ # timeout control.
+ #
+ # @api private
+ def send_frameset_without_timeout(frames, channel)
+ # some developers end up sharing channels between threads and when multiple
+ # threads publish on the same channel aggressively, at some point frames will be
+ # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception.
+ # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained
+ # locking. Note that "single frame" methods do not need this kind of synchronization. MK.
+ channel.synchronize do
+ frames.each { |frame| self.send_frame_without_timeout(frame) }
+ end
+ end # send_frameset_without_timeout(frames)
+
protected
# @api private
def init_connection
self.send_preamble
@@ -574,11 +618,11 @@
frame = begin
@transport.read_next_frame
# frame timeout means the broker has closed the TCP connection, which it
# does per 0.9.1 spec.
- rescue Errno::ECONNRESET, ClientTimeout, AMQ::Protocol::EmptyResponseError, EOFError => e
+ rescue Errno::ECONNRESET, ClientTimeout, AMQ::Protocol::EmptyResponseError, EOFError, IOError => e
nil
end
if frame.nil?
@state = :closed
raise Bunny::PossibleAuthenticationFailureError.new(self.user, self.vhost, self.password.size)
@@ -642,10 +686,10 @@
@heartbeat_sender.stop if @heartbeat_sender
end
# @api private
def initialize_transport
- @transport = Transport.new(self, @host, @port, @opts)
+ @transport = Transport.new(self, @host, @port, @opts.merge(:session_thread => Thread.current))
end
# Sends AMQ protocol header (also known as preamble).
# @api private
def send_preamble