lib/bunny/session.rb in bunny-2.6.1 vs lib/bunny/session.rb in bunny-2.6.2
- old
+ new
@@ -274,23 +274,20 @@
#
# @see http://rubybunny.info/articles/getting_started.html
# @see http://rubybunny.info/articles/connecting.html
# @api public
def start
-
return self if connected?
@status_mutex.synchronize { @status = :connecting }
# reset here for cases when automatic network recovery kicks in
# when we were blocked. MK.
@blocked = false
self.reset_continuations
begin
-
begin
-
# close existing transport if we have one,
# to not leak sockets
@transport.maybe_initialize_socket
@transport.post_initialize_socket
@@ -494,20 +491,22 @@
# Implementation
#
# @private
def open_channel(ch)
- n = ch.number
- self.register_channel(ch)
+ @channel_mutex.synchronize do
+ n = ch.number
+ self.register_channel(ch)
- @transport_mutex.synchronize do
- @transport.send_frame(AMQ::Protocol::Channel::Open.encode(n, AMQ::Protocol::EMPTY_STRING))
- end
- @last_channel_open_ok = wait_on_continuations
- raise_if_continuation_resulted_in_a_connection_error!
+ @transport_mutex.synchronize do
+ @transport.send_frame(AMQ::Protocol::Channel::Open.encode(n, AMQ::Protocol::EMPTY_STRING))
+ end
+ @last_channel_open_ok = wait_on_continuations
+ raise_if_continuation_resulted_in_a_connection_error!
- @last_channel_open_ok
+ @last_channel_open_ok
+ end
end
# @private
def close_channel(ch)
@channel_mutex.synchronize do
@@ -523,12 +522,14 @@
end
end
# @private
def close_all_channels
- @channels.reject {|n, ch| n == 0 || !ch.open? }.each do |_, ch|
- Bunny::Timeout.timeout(@transport.disconnect_timeout, ClientTimeout) { ch.close }
+ @channel_mutex.synchronize do
+ @channels.reject {|n, ch| n == 0 || !ch.open? }.each do |_, ch|
+ Bunny::Timeout.timeout(@transport.disconnect_timeout, ClientTimeout) { ch.close }
+ end
end
end
# @private
def close_connection(sync = true)
@@ -638,12 +639,14 @@
if !recovering_from_network_failure?
begin
@recovering_from_network_failure = true
if recoverable_network_failure?(exception)
@logger.warn "Recovering from a network failure..."
- @channels.each do |n, ch|
- ch.maybe_kill_consumer_work_pool!
+ @channel_mutex.synchronize do
+ @channels.each do |n, ch|
+ ch.maybe_kill_consumer_work_pool!
+ end
end
@reader_loop.stop if @reader_loop
maybe_shutdown_heartbeat_sender
recover_from_network_failure
@@ -699,14 +702,16 @@
@recovery_attempts.nil? || @recovery_attempts > 1
end
# @private
def recover_channels
- @channels.each do |n, ch|
- ch.open
+ @channel_mutex.synchronize do
+ @channels.each do |n, ch|
+ ch.open
- ch.recover_from_network_failure
+ ch.recover_from_network_failure
+ end
end
end
# @private
def instantiate_connection_level_exception(frame)
@@ -769,14 +774,14 @@
addresses = options[:host] || options[:hostname] || options[:addresses] ||
options[:hosts] || ["#{DEFAULT_HOST}:#{port_from(options)}"]
addresses = [addresses] unless addresses.is_a? Array
- addresses.map! do |address|
+ addrs = addresses.map do |address|
host_with_port?(address) ? address : "#{address}:#{port_from(@opts)}"
end
- shuffle_strategy.call addresses
+ shuffle_strategy.call(addrs)
end
# @private
def port_from(options)
fallback = if options[:tls] || options[:ssl]