lib/bunny/session.rb in bunny-2.11.0 vs lib/bunny/session.rb in bunny-2.12.0.rc1
- old
+ new
@@ -76,11 +76,11 @@
# API
#
# @return [Bunny::Transport]
attr_reader :transport
- attr_reader :status, :port, :heartbeat, :user, :pass, :vhost, :frame_max, :channel_max, :threaded
+ attr_reader :status, :heartbeat, :user, :pass, :vhost, :frame_max, :channel_max, :threaded
attr_reader :server_capabilities, :server_properties, :server_authentication_mechanisms, :server_locales
attr_reader :channel_id_allocator
# Authentication mechanism, e.g. "PLAIN" or "EXTERNAL"
# @return [String]
attr_reader :mechanism
@@ -150,19 +150,21 @@
@logger = opts.fetch(:logger, init_default_logger_without_progname(log_file, log_level))
@addresses = self.addresses_from(opts)
@address_index = 0
- # re-init, see above
- @logger = opts.fetch(:logger, init_default_logger(log_file, log_level))
-
+ @transport = nil
@user = self.username_from(opts)
@pass = self.password_from(opts)
@vhost = self.vhost_from(opts)
@threaded = opts.fetch(:threaded, true)
+ # re-init, see above
+ @logger = opts.fetch(:logger, init_default_logger(log_file, log_level))
+
validate_connection_options(opts)
+ @last_connection_error = nil
# should automatic recovery from network failures be used?
@automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil?
true
else
@@ -186,10 +188,11 @@
# these are negotiated with the broker during the connection tuning phase
@client_frame_max = opts.fetch(:frame_max, DEFAULT_FRAME_MAX)
@client_channel_max = normalize_client_channel_max(opts.fetch(:channel_max, DEFAULT_CHANNEL_MAX))
# will be-renegotiated during connection tuning steps. MK.
@channel_max = @client_channel_max
+ @heartbeat_sender = nil
@client_heartbeat = self.heartbeat_from(opts)
client_props = opts[:properties] || opts[:client_properties] || {}
@client_properties = DEFAULT_CLIENT_PROPERTIES.merge(client_props)
@mechanism = normalize_auth_mechanism(opts.fetch(:auth_mechanism, "PLAIN"))
@@ -306,14 +309,10 @@
@transport.maybe_initialize_socket
@transport.post_initialize_socket
@transport.connect
- if @socket_configurator
- @transport.configure_socket(&@socket_configurator)
- end
-
self.init_connection
self.open_connection
@reader_loop = nil
self.start_reader_loop if threaded?
@@ -416,11 +415,11 @@
# @return [Boolean] true if this AMQP 0.9.1 connection is closed
def closed?
@status_mutex.synchronize { @status == :closed }
end
- # @return [Boolean] true if this AMQP 0.9.1 connection has been programmatically closed
+ # @return [Boolean] true if this AMQP 0.9.1 connection has been closed by the user (as opposed to the server)
def manually_closed?
@status_mutex.synchronize { @manually_closed == true }
end
# @return [Boolean] true if this AMQP 0.9.1 connection is open
@@ -755,10 +754,11 @@
announce_network_failure_recovery
retry
end
else
@logger.error "Ran out of recovery attempts (limit set to #{@max_recovery_attempts})"
+ self.close
end
end
# @private
def recovery_attempts_limited?
@@ -1076,13 +1076,17 @@
# locking. Note that "single frame" methods technically do not need this kind of synchronization
# (no incorrect frame interleaving of the same kind as with basic.publish isn't possible) but we
# still recommend not sharing channels between threads except for consumer-only cases in the docs. MK.
channel.synchronize do
# see rabbitmq/rabbitmq-server#156
- data = frames.reduce("") { |acc, frame| acc << frame.encode }
- @transport.write(data)
- signal_activity!
+ if open?
+ data = frames.reduce("") { |acc, frame| acc << frame.encode }
+ @transport.write(data)
+ signal_activity!
+ else
+ raise ConnectionClosedError.new(frames)
+ end
end
end # send_frameset(frames)
# Sends multiple frames, one by one. For thread safety this method takes a channel
# object and synchronizes on it. Uses transport implementation that does not perform
@@ -1094,11 +1098,15 @@
# threads publish on the same channel aggressively, at some point frames will be
# delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception.
# If we synchronize on the channel, however, this is both thread safe and pretty fine-grained
# locking. See a note about "single frame" methods in a comment in `send_frameset`. MK.
channel.synchronize do
- frames.each { |frame| self.send_frame_without_timeout(frame, false) }
- signal_activity!
+ if open?
+ frames.each { |frame| self.send_frame_without_timeout(frame, false) }
+ signal_activity!
+ else
+ raise ConnectionClosedError.new(frames)
+ end
end
end # send_frameset_without_timeout(frames)
# @private
def send_raw_without_timeout(data, channel)