lib/bunny/session.rb in bunny-0.9.0.pre7 vs lib/bunny/session.rb in bunny-0.9.0.pre8

- old
+ new

@@ -99,10 +99,17 @@ @vhost = self.vhost_from(opts) @logfile = opts[:logfile] @logging = opts[:logging] || false @threaded = opts.fetch(:threaded, true) + # should automatic recovery from network failures be used? + @automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil? + true + else + opts[:automatically_recover] || opts[:automatic_recovery] + end + @status = :not_connected # these are negotiated with the broker during the connection tuning phase @client_frame_max = opts.fetch(:frame_max, DEFAULT_FRAME_MAX) @client_channel_max = opts.fetch(:channel_max, 65536) @@ -110,11 +117,13 @@ @client_properties = opts[:properties] || DEFAULT_CLIENT_PROPERTIES @mechanism = opts.fetch(:auth_mechanism, "PLAIN") @credentials_encoder = credentials_encoder_for(@mechanism) @locale = @opts.fetch(:locale, DEFAULT_LOCALE) + # mutex for the channel id => channel hash @channel_mutex = Mutex.new + @network_mutex = Mutex.new @channels = Hash.new @continuations = ::Queue.new end @@ -208,10 +217,14 @@ def open? (status == :open || status == :connected) && @transport.open? end alias connected? open? + def automatically_recover? + @automatically_recover + end + # # Backwards compatibility # # @private @@ -252,11 +265,13 @@ # @private def open_channel(ch) n = ch.number self.register_channel(ch) - @transport.send_frame(AMQ::Protocol::Channel::Open.encode(n, AMQ::Protocol::EMPTY_STRING)) + @channel_mutex.synchronize do + @transport.send_frame(AMQ::Protocol::Channel::Open.encode(n, AMQ::Protocol::EMPTY_STRING)) + end @last_channel_open_ok = wait_on_continuations raise_if_continuation_resulted_in_a_connection_error! @last_channel_open_ok end @@ -315,12 +330,11 @@ rescue StandardError => e puts e.class.name puts e.message puts e.backtrace ensure - @active_continuation.notify_all if @active_continuation - @active_continuation = false + @continuations.push(nil) end when AMQ::Protocol::Channel::Close then begin ch = @channels[ch_number] ch.handle_method(method) @@ -418,12 +432,12 @@ ch.recover_from_network_failure end end # @private - def send_raw(*args) - @transport.write(*args) + def send_raw(data) + @transport.write(data) end # @private def instantiate_connection_level_exception(frame) case frame @@ -511,11 +525,11 @@ event_loop.start end # @private def event_loop - @event_loop ||= MainLoop.new(@transport, self) + @event_loop ||= MainLoop.new(@transport, self, Thread.current) end # @private def signal_activity! @heartbeat_sender.signal_activity! if @heartbeat_sender @@ -529,14 +543,28 @@ # @private def send_frame(frame) if closed? raise ConnectionClosedError.new(frame) else - @transport.send_raw(frame.encode) + @network_mutex.synchronize { @transport.write(frame.encode) } end end + # Sends frame to the peer, checking that connection is open. + # Uses transport implementation that does not perform + # timeout control. Exposed primarily for Bunny::Channel. + # + # @raise [ConnectionClosedError] + # @private + def send_frame_without_timeout(frame) + if closed? + raise ConnectionClosedError.new(frame) + else + @network_mutex.synchronize { @transport.write_without_timeout(frame.encode) } + end + end + # Sends multiple frames, one by one. For thread safety this method takes a channel # object and synchronizes on it. # # @api private def send_frameset(frames, channel) @@ -544,15 +572,31 @@ # threads publish on the same channel aggressively, at some point frames will be # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception. # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained # locking. Note that "single frame" methods do not need this kind of synchronization. MK. channel.synchronize do - frames.each { |frame| @transport.send_frame(frame) } + frames.each { |frame| self.send_frame(frame) } @transport.flush end end # send_frameset(frames) + # Sends multiple frames, one by one. For thread safety this method takes a channel + # object and synchronizes on it. Uses transport implementation that does not perform + # timeout control. + # + # @api private + def send_frameset_without_timeout(frames, channel) + # some developers end up sharing channels between threads and when multiple + # threads publish on the same channel aggressively, at some point frames will be + # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception. + # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained + # locking. Note that "single frame" methods do not need this kind of synchronization. MK. + channel.synchronize do + frames.each { |frame| self.send_frame_without_timeout(frame) } + end + end # send_frameset_without_timeout(frames) + protected # @api private def init_connection self.send_preamble @@ -574,11 +618,11 @@ frame = begin @transport.read_next_frame # frame timeout means the broker has closed the TCP connection, which it # does per 0.9.1 spec. - rescue Errno::ECONNRESET, ClientTimeout, AMQ::Protocol::EmptyResponseError, EOFError => e + rescue Errno::ECONNRESET, ClientTimeout, AMQ::Protocol::EmptyResponseError, EOFError, IOError => e nil end if frame.nil? @state = :closed raise Bunny::PossibleAuthenticationFailureError.new(self.user, self.vhost, self.password.size) @@ -642,10 +686,10 @@ @heartbeat_sender.stop if @heartbeat_sender end # @api private def initialize_transport - @transport = Transport.new(self, @host, @port, @opts) + @transport = Transport.new(self, @host, @port, @opts.merge(:session_thread => Thread.current)) end # Sends AMQ protocol header (also known as preamble). # @api private def send_preamble