lib/bunny/session.rb in bunny-1.1.0.rc1 vs lib/bunny/session.rb in bunny-1.1.0

- old
+ new

@@ -273,16 +273,18 @@ # # @return [Bunny::Channel] Newly opened channel def create_channel(n = nil, consumer_pool_size = 1) raise ArgumentError, "channel number 0 is reserved in the protocol and cannot be used" if 0 == n - if n && (ch = @channels[n]) - ch - else - ch = Bunny::Channel.new(self, n, ConsumerWorkPool.new(consumer_pool_size || 1)) - ch.open - ch + @channel_mutex.synchronize do + if n && (ch = @channels[n]) + ch + else + ch = Bunny::Channel.new(self, n, ConsumerWorkPool.new(consumer_pool_size || 1)) + ch.open + ch + end end end alias channel create_channel # Closes the connection. This involves closing all of its channels. @@ -468,18 +470,20 @@ @last_channel_open_ok end # @private def close_channel(ch) - n = ch.number + @channel_mutex.synchronize do + n = ch.number - @transport.send_frame(AMQ::Protocol::Channel::Close.encode(n, 200, "Goodbye", 0, 0)) - @last_channel_close_ok = wait_on_continuations - raise_if_continuation_resulted_in_a_connection_error! + @transport.send_frame(AMQ::Protocol::Channel::Close.encode(n, 200, "Goodbye", 0, 0)) + @last_channel_close_ok = wait_on_continuations + raise_if_continuation_resulted_in_a_connection_error! - self.unregister_channel(ch) - @last_channel_close_ok + self.unregister_channel(ch) + @last_channel_close_ok + end end # @private def close_all_channels @channels.reject {|n, ch| n == 0 || !ch.open? }.each do |_, ch| @@ -634,11 +638,11 @@ @recovering_from_network_failure = false recover_channels end rescue TCPConnectionFailed, AMQ::Protocol::EmptyResponseError => e - @logger.warn "TCP connection failed, reconnecting in 5 seconds" + @logger.warn "TCP connection failed, reconnecting in #{@network_recovery_interval} seconds" sleep @network_recovery_interval retry if recoverable_network_failure?(e) end end @@ -805,10 +809,13 @@ # # @raise [ConnectionClosedError] # @private def send_frame(frame, signal_activity = true) if open? + # @transport_mutex.synchronize do + # @transport.write(frame.encode) + # end @transport.write(frame.encode) signal_activity! if signal_activity else raise ConnectionClosedError.new(frame) end @@ -902,11 +909,15 @@ def open_connection @transport.send_frame(AMQ::Protocol::Connection::StartOk.encode(@client_properties, @mechanism, self.encode_credentials(username, password), @locale)) @logger.debug "Sent connection.start-ok" frame = begin - @transport.read_next_frame + fr = @transport.read_next_frame + while fr.is_a?(AMQ::Protocol::HeartbeatFrame) + fr = @transport.read_next_frame + end + fr # frame timeout means the broker has closed the TCP connection, which it # does per 0.9.1 spec. rescue Errno::ECONNRESET, ClientTimeout, AMQ::Protocol::EmptyResponseError, EOFError, IOError => e nil end @@ -944,10 +955,14 @@ @logger.debug "Sent connection.tune-ok with heartbeat interval = #{@heartbeat}, frame_max = #{@frame_max}, channel_max = #{@channel_max}" @transport.send_frame(AMQ::Protocol::Connection::Open.encode(self.vhost)) @logger.debug "Sent connection.open with vhost = #{self.vhost}" frame2 = begin - @transport.read_next_frame + fr = @transport.read_next_frame + while fr.is_a?(AMQ::Protocol::HeartbeatFrame) + fr = @transport.read_next_frame + end + fr # frame timeout means the broker has closed the TCP connection, which it # does per 0.9.1 spec. rescue Errno::ECONNRESET, ClientTimeout, AMQ::Protocol::EmptyResponseError, EOFError => e nil end