lib/bunny/session.rb in bunny-2.6.1 vs lib/bunny/session.rb in bunny-2.6.2

- old
+ new

@@ -274,23 +274,20 @@ # # @see http://rubybunny.info/articles/getting_started.html # @see http://rubybunny.info/articles/connecting.html # @api public def start - return self if connected? @status_mutex.synchronize { @status = :connecting } # reset here for cases when automatic network recovery kicks in # when we were blocked. MK. @blocked = false self.reset_continuations begin - begin - # close existing transport if we have one, # to not leak sockets @transport.maybe_initialize_socket @transport.post_initialize_socket @@ -494,20 +491,22 @@ # Implementation # # @private def open_channel(ch) - n = ch.number - self.register_channel(ch) + @channel_mutex.synchronize do + n = ch.number + self.register_channel(ch) - @transport_mutex.synchronize do - @transport.send_frame(AMQ::Protocol::Channel::Open.encode(n, AMQ::Protocol::EMPTY_STRING)) - end - @last_channel_open_ok = wait_on_continuations - raise_if_continuation_resulted_in_a_connection_error! + @transport_mutex.synchronize do + @transport.send_frame(AMQ::Protocol::Channel::Open.encode(n, AMQ::Protocol::EMPTY_STRING)) + end + @last_channel_open_ok = wait_on_continuations + raise_if_continuation_resulted_in_a_connection_error! - @last_channel_open_ok + @last_channel_open_ok + end end # @private def close_channel(ch) @channel_mutex.synchronize do @@ -523,12 +522,14 @@ end end # @private def close_all_channels - @channels.reject {|n, ch| n == 0 || !ch.open? }.each do |_, ch| - Bunny::Timeout.timeout(@transport.disconnect_timeout, ClientTimeout) { ch.close } + @channel_mutex.synchronize do + @channels.reject {|n, ch| n == 0 || !ch.open? }.each do |_, ch| + Bunny::Timeout.timeout(@transport.disconnect_timeout, ClientTimeout) { ch.close } + end end end # @private def close_connection(sync = true) @@ -638,12 +639,14 @@ if !recovering_from_network_failure? begin @recovering_from_network_failure = true if recoverable_network_failure?(exception) @logger.warn "Recovering from a network failure..." - @channels.each do |n, ch| - ch.maybe_kill_consumer_work_pool! + @channel_mutex.synchronize do + @channels.each do |n, ch| + ch.maybe_kill_consumer_work_pool! + end end @reader_loop.stop if @reader_loop maybe_shutdown_heartbeat_sender recover_from_network_failure @@ -699,14 +702,16 @@ @recovery_attempts.nil? || @recovery_attempts > 1 end # @private def recover_channels - @channels.each do |n, ch| - ch.open + @channel_mutex.synchronize do + @channels.each do |n, ch| + ch.open - ch.recover_from_network_failure + ch.recover_from_network_failure + end end end # @private def instantiate_connection_level_exception(frame) @@ -769,14 +774,14 @@ addresses = options[:host] || options[:hostname] || options[:addresses] || options[:hosts] || ["#{DEFAULT_HOST}:#{port_from(options)}"] addresses = [addresses] unless addresses.is_a? Array - addresses.map! do |address| + addrs = addresses.map do |address| host_with_port?(address) ? address : "#{address}:#{port_from(@opts)}" end - shuffle_strategy.call addresses + shuffle_strategy.call(addrs) end # @private def port_from(options) fallback = if options[:tls] || options[:ssl]