lib/bunny/session.rb in bunny-1.1.0.rc1 vs lib/bunny/session.rb in bunny-1.1.0
- old
+ new
@@ -273,16 +273,18 @@
#
# @return [Bunny::Channel] Newly opened channel
def create_channel(n = nil, consumer_pool_size = 1)
raise ArgumentError, "channel number 0 is reserved in the protocol and cannot be used" if 0 == n
- if n && (ch = @channels[n])
- ch
- else
- ch = Bunny::Channel.new(self, n, ConsumerWorkPool.new(consumer_pool_size || 1))
- ch.open
- ch
+ @channel_mutex.synchronize do
+ if n && (ch = @channels[n])
+ ch
+ else
+ ch = Bunny::Channel.new(self, n, ConsumerWorkPool.new(consumer_pool_size || 1))
+ ch.open
+ ch
+ end
end
end
alias channel create_channel
# Closes the connection. This involves closing all of its channels.
@@ -468,18 +470,20 @@
@last_channel_open_ok
end
# @private
def close_channel(ch)
- n = ch.number
+ @channel_mutex.synchronize do
+ n = ch.number
- @transport.send_frame(AMQ::Protocol::Channel::Close.encode(n, 200, "Goodbye", 0, 0))
- @last_channel_close_ok = wait_on_continuations
- raise_if_continuation_resulted_in_a_connection_error!
+ @transport.send_frame(AMQ::Protocol::Channel::Close.encode(n, 200, "Goodbye", 0, 0))
+ @last_channel_close_ok = wait_on_continuations
+ raise_if_continuation_resulted_in_a_connection_error!
- self.unregister_channel(ch)
- @last_channel_close_ok
+ self.unregister_channel(ch)
+ @last_channel_close_ok
+ end
end
# @private
def close_all_channels
@channels.reject {|n, ch| n == 0 || !ch.open? }.each do |_, ch|
@@ -634,11 +638,11 @@
@recovering_from_network_failure = false
recover_channels
end
rescue TCPConnectionFailed, AMQ::Protocol::EmptyResponseError => e
- @logger.warn "TCP connection failed, reconnecting in 5 seconds"
+ @logger.warn "TCP connection failed, reconnecting in #{@network_recovery_interval} seconds"
sleep @network_recovery_interval
retry if recoverable_network_failure?(e)
end
end
@@ -805,10 +809,13 @@
#
# @raise [ConnectionClosedError]
# @private
def send_frame(frame, signal_activity = true)
if open?
+ # @transport_mutex.synchronize do
+ # @transport.write(frame.encode)
+ # end
@transport.write(frame.encode)
signal_activity! if signal_activity
else
raise ConnectionClosedError.new(frame)
end
@@ -902,11 +909,15 @@
def open_connection
@transport.send_frame(AMQ::Protocol::Connection::StartOk.encode(@client_properties, @mechanism, self.encode_credentials(username, password), @locale))
@logger.debug "Sent connection.start-ok"
frame = begin
- @transport.read_next_frame
+ fr = @transport.read_next_frame
+ while fr.is_a?(AMQ::Protocol::HeartbeatFrame)
+ fr = @transport.read_next_frame
+ end
+ fr
# frame timeout means the broker has closed the TCP connection, which it
# does per 0.9.1 spec.
rescue Errno::ECONNRESET, ClientTimeout, AMQ::Protocol::EmptyResponseError, EOFError, IOError => e
nil
end
@@ -944,10 +955,14 @@
@logger.debug "Sent connection.tune-ok with heartbeat interval = #{@heartbeat}, frame_max = #{@frame_max}, channel_max = #{@channel_max}"
@transport.send_frame(AMQ::Protocol::Connection::Open.encode(self.vhost))
@logger.debug "Sent connection.open with vhost = #{self.vhost}"
frame2 = begin
- @transport.read_next_frame
+ fr = @transport.read_next_frame
+ while fr.is_a?(AMQ::Protocol::HeartbeatFrame)
+ fr = @transport.read_next_frame
+ end
+ fr
# frame timeout means the broker has closed the TCP connection, which it
# does per 0.9.1 spec.
rescue Errno::ECONNRESET, ClientTimeout, AMQ::Protocol::EmptyResponseError, EOFError => e
nil
end