lib/bunny/session.rb in bunny-2.11.0 vs lib/bunny/session.rb in bunny-2.12.0.rc1

- old
+ new

@@ -76,11 +76,11 @@ # API # # @return [Bunny::Transport] attr_reader :transport - attr_reader :status, :port, :heartbeat, :user, :pass, :vhost, :frame_max, :channel_max, :threaded + attr_reader :status, :heartbeat, :user, :pass, :vhost, :frame_max, :channel_max, :threaded attr_reader :server_capabilities, :server_properties, :server_authentication_mechanisms, :server_locales attr_reader :channel_id_allocator # Authentication mechanism, e.g. "PLAIN" or "EXTERNAL" # @return [String] attr_reader :mechanism @@ -150,19 +150,21 @@ @logger = opts.fetch(:logger, init_default_logger_without_progname(log_file, log_level)) @addresses = self.addresses_from(opts) @address_index = 0 - # re-init, see above - @logger = opts.fetch(:logger, init_default_logger(log_file, log_level)) - + @transport = nil @user = self.username_from(opts) @pass = self.password_from(opts) @vhost = self.vhost_from(opts) @threaded = opts.fetch(:threaded, true) + # re-init, see above + @logger = opts.fetch(:logger, init_default_logger(log_file, log_level)) + validate_connection_options(opts) + @last_connection_error = nil # should automatic recovery from network failures be used? @automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil? true else @@ -186,10 +188,11 @@ # these are negotiated with the broker during the connection tuning phase @client_frame_max = opts.fetch(:frame_max, DEFAULT_FRAME_MAX) @client_channel_max = normalize_client_channel_max(opts.fetch(:channel_max, DEFAULT_CHANNEL_MAX)) # will be-renegotiated during connection tuning steps. MK. @channel_max = @client_channel_max + @heartbeat_sender = nil @client_heartbeat = self.heartbeat_from(opts) client_props = opts[:properties] || opts[:client_properties] || {} @client_properties = DEFAULT_CLIENT_PROPERTIES.merge(client_props) @mechanism = normalize_auth_mechanism(opts.fetch(:auth_mechanism, "PLAIN")) @@ -306,14 +309,10 @@ @transport.maybe_initialize_socket @transport.post_initialize_socket @transport.connect - if @socket_configurator - @transport.configure_socket(&@socket_configurator) - end - self.init_connection self.open_connection @reader_loop = nil self.start_reader_loop if threaded? @@ -416,11 +415,11 @@ # @return [Boolean] true if this AMQP 0.9.1 connection is closed def closed? @status_mutex.synchronize { @status == :closed } end - # @return [Boolean] true if this AMQP 0.9.1 connection has been programmatically closed + # @return [Boolean] true if this AMQP 0.9.1 connection has been closed by the user (as opposed to the server) def manually_closed? @status_mutex.synchronize { @manually_closed == true } end # @return [Boolean] true if this AMQP 0.9.1 connection is open @@ -755,10 +754,11 @@ announce_network_failure_recovery retry end else @logger.error "Ran out of recovery attempts (limit set to #{@max_recovery_attempts})" + self.close end end # @private def recovery_attempts_limited? @@ -1076,13 +1076,17 @@ # locking. Note that "single frame" methods technically do not need this kind of synchronization # (no incorrect frame interleaving of the same kind as with basic.publish isn't possible) but we # still recommend not sharing channels between threads except for consumer-only cases in the docs. MK. channel.synchronize do # see rabbitmq/rabbitmq-server#156 - data = frames.reduce("") { |acc, frame| acc << frame.encode } - @transport.write(data) - signal_activity! + if open? + data = frames.reduce("") { |acc, frame| acc << frame.encode } + @transport.write(data) + signal_activity! + else + raise ConnectionClosedError.new(frames) + end end end # send_frameset(frames) # Sends multiple frames, one by one. For thread safety this method takes a channel # object and synchronizes on it. Uses transport implementation that does not perform @@ -1094,11 +1098,15 @@ # threads publish on the same channel aggressively, at some point frames will be # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception. # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained # locking. See a note about "single frame" methods in a comment in `send_frameset`. MK. channel.synchronize do - frames.each { |frame| self.send_frame_without_timeout(frame, false) } - signal_activity! + if open? + frames.each { |frame| self.send_frame_without_timeout(frame, false) } + signal_activity! + else + raise ConnectionClosedError.new(frames) + end end end # send_frameset_without_timeout(frames) # @private def send_raw_without_timeout(data, channel)