lib/bunny/session.rb in bunny-0.9.0.pre10 vs lib/bunny/session.rb in bunny-0.9.0.pre11

- old
+ new

@@ -2,11 +2,11 @@ require "thread" require "bunny/transport" require "bunny/channel_id_allocator" require "bunny/heartbeat_sender" -require "bunny/main_loop" +require "bunny/reader_loop" require "bunny/authentication/credentials_encoder" require "bunny/authentication/plain_mechanism_encoder" require "bunny/authentication/external_mechanism_encoder" if defined?(JRUBY_VERSION) @@ -68,10 +68,14 @@ attr_reader :default_channel attr_reader :channel_id_allocator # Authentication mechanism, e.g. "PLAIN" or "EXTERNAL" # @return [String] attr_reader :mechanism + # @return [Logger] + attr_reader :logger + # @return [Integer] Timeout for blocking protocol operations (queue.declare, queue.bind, etc), in milliseconds. Default is 4000. + attr_reader :continuation_timeout # @param [String, Hash] connection_string_or_opts Connection string or a hash of connection options # @param [Hash] optz Extra options not related to connection # @@ -101,21 +105,24 @@ @host = self.hostname_from(opts) @port = self.port_from(opts) @user = self.username_from(opts) @pass = self.password_from(opts) @vhost = self.vhost_from(opts) - @logfile = opts[:logfile] - @logging = opts[:logging] || false + @logfile = opts[:log_file] || opts[:logfile] || STDOUT @threaded = opts.fetch(:threaded, true) + self.init_logger(opts[:log_level] || ENV["BUNNY_LOG_LEVEL"] || Logger::WARN) + # should automatic recovery from network failures be used? @automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil? true else opts[:automatically_recover] || opts[:automatic_recovery] end @network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL) + # in ms + @continuation_timeout = opts.fetch(:continuation_timeout, 4000) @status = :not_connected # these are negotiated with the broker during the connection tuning phase @client_frame_max = opts.fetch(:frame_max, DEFAULT_FRAME_MAX) @@ -126,11 +133,10 @@ @mechanism = opts.fetch(:auth_mechanism, "PLAIN") @credentials_encoder = credentials_encoder_for(@mechanism) @locale = @opts.fetch(:locale, DEFAULT_LOCALE) # mutex for the channel id => channel hash @channel_mutex = Mutex.new - @network_mutex = Mutex.new @channels = Hash.new self.reset_continuations end @@ -178,12 +184,12 @@ self.initialize_transport self.init_connection self.open_connection - @event_loop = nil - self.start_main_loop if @threaded + @reader_loop = nil + self.start_reader_loop if @threaded @default_channel = self.create_channel rescue Exception => e @status = :not_connected raise e @@ -199,15 +205,15 @@ # Opens a new channel and returns it. This method will block the calling # thread until the response is received and the channel is guaranteed to be # opened (this operation is very fast and inexpensive). # # @return [Bunny::Channel] Newly opened channel - def create_channel(n = nil) + def create_channel(n = nil, consumer_pool_size = 1) if n && (ch = @channels[n]) ch else - ch = Bunny::Channel.new(self, n) + ch = Bunny::Channel.new(self, n, ConsumerWorkPool.new(consumer_pool_size || 1)) ch.open ch end end alias channel create_channel @@ -339,11 +345,11 @@ end end # @private def handle_frame(ch_number, method) - # puts "Session#handle_frame on #{ch_number}: #{method.inspect}" + @logger.debug "Session#handle_frame on #{ch_number}: #{method.inspect}" case method when AMQ::Protocol::Channel::OpenOk then @continuations.push(method) when AMQ::Protocol::Channel::CloseOk then @continuations.push(method) @@ -355,18 +361,18 @@ when AMQ::Protocol::Connection::CloseOk then @last_connection_close_ok = method begin @continuations.clear - event_loop.stop - @event_loop = nil + reader_loop.stop + @reader_loop = nil @transport.close rescue StandardError => e - puts e.class.name - puts e.message - puts e.backtrace + @logger.error e.class.name + @logger.error e.message + @logger.error e.backtrace ensure @continuations.push(:__unblock__) end when AMQ::Protocol::Channel::Close then begin @@ -379,11 +385,11 @@ @channels[ch_number].handle_basic_get_empty(method) else if ch = @channels[ch_number] ch.handle_method(method) else - # TODO: log a warning + @logger.warn "Channel #{ch_number} is not open on this connection!" end end end # @private @@ -413,11 +419,11 @@ @status = :disconnected if !recovering_from_network_failure? @recovering_from_network_failure = true if recoverable_network_failure?(exception) - # puts "Recovering from a network failure..." + @logger.warn "Recovering from a network failure..." @channels.each do |n, ch| ch.maybe_kill_consumer_work_pool! end maybe_shutdown_heartbeat_sender @@ -441,20 +447,20 @@ # @private def recover_from_network_failure begin sleep @network_recovery_interval - # puts "About to start recovery..." + @logger.debug "About to start connection recovery..." start if open? @recovering_from_network_failure = false recover_channels end rescue TCPConnectionFailed, AMQ::Protocol::EmptyResponseError => e - # puts "TCP connection failed, reconnecting in 5 seconds" + @logger.warn "TCP connection failed, reconnecting in 5 seconds" sleep @network_recovery_interval retry if recoverable_network_failure?(e) end end @@ -469,21 +475,18 @@ ch.recover_from_network_failure end end # @private - def send_raw(data) - @transport.write(data) - end - - # @private def instantiate_connection_level_exception(frame) case frame when AMQ::Protocol::Connection::Close then klass = case frame.reply_code when 320 then ConnectionForced + when 501 then + FrameError when 503 then InvalidCommand when 504 then ChannelError when 505 then @@ -558,22 +561,22 @@ @channels.delete(ch.number) end end # @private - def start_main_loop - event_loop.start + def start_reader_loop + reader_loop.start end # @private - def event_loop - @event_loop ||= MainLoop.new(@transport, self, Thread.current) + def reader_loop + @reader_loop ||= ReaderLoop.new(@transport, self, Thread.current) end # @private - def maybe_shutdown_main_loop - @event_loop.stop if @event_loop + def maybe_shutdown_reader_loop + @reader_loop.stop if @reader_loop end # @private def signal_activity! @heartbeat_sender.signal_activity! if @heartbeat_sender @@ -583,29 +586,31 @@ # Sends frame to the peer, checking that connection is open. # Exposed primarily for Bunny::Channel # # @raise [ConnectionClosedError] # @private - def send_frame(frame) + def send_frame(frame, signal_activity = true) if closed? raise ConnectionClosedError.new(frame) else - @network_mutex.synchronize { @transport.write(frame.encode) } + @transport.write(frame.encode) + signal_activity! if signal_activity end end # Sends frame to the peer, checking that connection is open. # Uses transport implementation that does not perform # timeout control. Exposed primarily for Bunny::Channel. # # @raise [ConnectionClosedError] # @private - def send_frame_without_timeout(frame) + def send_frame_without_timeout(frame, signal_activity = true) if closed? raise ConnectionClosedError.new(frame) else - @network_mutex.synchronize { @transport.write_without_timeout(frame.encode) } + @transport.write_without_timeout(frame.encode) + signal_activity! if signal_activity end end # Sends multiple frames, one by one. For thread safety this method takes a channel # object and synchronizes on it. @@ -616,12 +621,12 @@ # threads publish on the same channel aggressively, at some point frames will be # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception. # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained # locking. Note that "single frame" methods do not need this kind of synchronization. MK. channel.synchronize do - frames.each { |frame| self.send_frame(frame) } - @transport.flush + frames.each { |frame| self.send_frame(frame, false) } + signal_activity! end end # send_frameset(frames) # Sends multiple frames, one by one. For thread safety this method takes a channel # object and synchronizes on it. Uses transport implementation that does not perform @@ -633,14 +638,21 @@ # threads publish on the same channel aggressively, at some point frames will be # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception. # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained # locking. Note that "single frame" methods do not need this kind of synchronization. MK. channel.synchronize do - frames.each { |frame| self.send_frame_without_timeout(frame) } + frames.each { |frame| self.send_frame_without_timeout(frame, false) } + signal_activity! end end # send_frameset_without_timeout(frames) + # @return [String] + # @api public + def to_s + "Bunny::Session #{@user}@#{@host}:#{@port}, vhost=#{@vhost}" + end + protected # @api private def init_connection self.send_preamble @@ -657,20 +669,22 @@ end # @api private def open_connection @transport.send_frame(AMQ::Protocol::Connection::StartOk.encode(@client_properties, @mechanism, self.encode_credentials(username, password), @locale)) + @logger.debug "Sent connection.start-ok" frame = begin @transport.read_next_frame # frame timeout means the broker has closed the TCP connection, which it # does per 0.9.1 spec. rescue Errno::ECONNRESET, ClientTimeout, AMQ::Protocol::EmptyResponseError, EOFError, IOError => e nil end if frame.nil? @state = :closed + @logger.error "RabbitMQ closed TCP connection before AMQP 0.9.1 connection was finalized. Most likely this means authentication failure." raise Bunny::PossibleAuthenticationFailureError.new(self.user, self.vhost, self.password.size) end connection_tune = frame.decode_payload @@ -680,25 +694,30 @@ @heartbeat = if heartbeat_disabled?(@client_heartbeat) 0 else negotiate_value(@client_heartbeat, connection_tune.heartbeat) end + @logger.debug "Heartbeat interval negotiation: client = #{@client_heartbeat}, server = #{connection_tune.heartbeat}, result = #{@heartbeat}" + @logger.info "Heartbeat interval used (in seconds): #{@heartbeat}" @channel_id_allocator = ChannelIdAllocator.new(@channel_max) @transport.send_frame(AMQ::Protocol::Connection::TuneOk.encode(@channel_max, @frame_max, @heartbeat)) + @logger.debug "Sent connection.tune-ok with heartbeat interval = #{@heartbeat}, frame_max = #{@frame_max}, channel_max = #{@channel_max}" @transport.send_frame(AMQ::Protocol::Connection::Open.encode(self.vhost)) + @logger.debug "Sent connection.open with vhost = #{self.vhost}" frame2 = begin @transport.read_next_frame # frame timeout means the broker has closed the TCP connection, which it # does per 0.9.1 spec. rescue Errno::ECONNRESET, ClientTimeout, AMQ::Protocol::EmptyResponseError, EOFError => e nil end if frame2.nil? @state = :closed + @logger.warn "RabbitMQ closed TCP connection before AMQP 0.9.1 connection was finalized. Most likely this means authentication failure." raise Bunny::PossibleAuthenticationFailureError.new(self.user, self.vhost, self.password.size) end connection_open_ok = frame2.decode_payload @status = :open @@ -724,12 +743,12 @@ end end # @api private def initialize_heartbeat_sender - # puts "Initializing heartbeat sender..." - @heartbeat_sender = HeartbeatSender.new(@transport) + @logger.debug "Initializing heartbeat sender..." + @heartbeat_sender = HeartbeatSender.new(@transport, @logger) @heartbeat_sender.start(@heartbeat) end # @api private def maybe_shutdown_heartbeat_sender @@ -747,11 +766,12 @@ end # Sends AMQ protocol header (also known as preamble). # @api private def send_preamble - @transport.send_raw(AMQ::Protocol::PREAMBLE) + @transport.write(AMQ::Protocol::PREAMBLE) + @logger.debug "Sent protocol preamble" end # @api private def encode_credentials(username, password) @@ -761,25 +781,50 @@ # @api private def credentials_encoder_for(mechanism) Authentication::CredentialsEncoder.for_session(self) end - # @api private - def reset_continuations - @continuations = if defined?(JRUBY_VERSION) - Concurrent::LinkedContinuationQueue.new - else - Concurrent::ContinuationQueue.new - end + if defined?(JRUBY_VERSION) + # @api private + def reset_continuations + @continuations = Concurrent::LinkedContinuationQueue.new + end + else + # @api private + def reset_continuations + @continuations = Concurrent::ContinuationQueue.new + end end # @api private def wait_on_continuations unless @threaded - event_loop.run_once until @continuations.length > 0 + reader_loop.run_once until @continuations.length > 0 end - @continuations.pop + @continuations.poll(@continuation_timeout) + end + + # @api private + def init_logger(level) + @logger = ::Logger.new(@logfile) + @logger.level = normalize_log_level(level) + @logger.progname = self.to_s + + @logger + end + + # @api private + def normalize_log_level(level) + case level + when :debug, Logger::DEBUG, "debug" then Logger::DEBUG + when :info, Logger::INFO, "info" then Logger::INFO + when :warn, Logger::WARN, "warn" then Logger::WARN + when :error, Logger::ERROR, "error" then Logger::ERROR + when :fatal, Logger::FATAL, "fatal" then Logger::FATAL + else + Logger::WARN + end end end # Session # backwards compatibility Client = Session