lib/mongo/server/monitor.rb in mongo-2.12.4 vs lib/mongo/server/monitor.rb in mongo-2.13.0.beta1

- old
+ new

@@ -1,6 +1,6 @@ -# Copyright (C) 2014-2019 MongoDB Inc. +# Copyright (C) 2014-2020 MongoDB Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # @@ -20,26 +20,27 @@ # # Does all work in a background thread so as to not interfere with other # operations performed by the driver. # # @since 2.0.0 + # @api private class Monitor include Loggable extend Forwardable include Event::Publisher include BackgroundThread - # The default time for a server to refresh its status is 10 seconds. + # The default interval between server status refreshes is 10 seconds. # # @since 2.0.0 - HEARTBEAT_FREQUENCY = 10.freeze + DEFAULT_HEARTBEAT_INTERVAL = 10.freeze # The minimum time between forced server scans. Is # minHeartbeatFrequencyMS in the SDAM spec. # # @since 2.0.0 - MIN_SCAN_FREQUENCY = 0.5.freeze + MIN_SCAN_INTERVAL = 0.5.freeze # The weighting factor (alpha) for calculating the average moving round trip time. # # @since 2.0.0 # @deprecated Will be removed in version 3.0. @@ -57,10 +58,12 @@ # @param [ Monitoring ] monitoring The monitoring.. # @param [ Hash ] options The options. # # @option options [ Float ] :connect_timeout The timeout, in seconds, to # use when establishing the monitoring connection. + # @option options [ Float ] :heartbeat_interval The interval between + # regular server checks. # @option options [ Logger ] :logger A custom logger to use. # @option options [ Float ] :socket_timeout The timeout, in seconds, to # execute operations on the monitoring connection. # # @since 2.0.0 @@ -71,14 +74,12 @@ end @server = server @event_listeners = event_listeners @monitoring = monitoring @options = options.freeze - # This is a Mongo::Server::Monitor::Connection - @connection = Connection.new(server.address, options) @mutex = Mutex.new - @scan_started_at = nil + @next_earliest_scan = @next_wanted_scan = Time.now end # @return [ Server ] server The server that this monitor is monitoring. # @api private attr_reader :server @@ -87,33 +88,29 @@ attr_reader :connection # @return [ Hash ] options The server options. attr_reader :options + # The interval between regular server checks. + # + # @return [ Float ] The heartbeat interval, in seconds. + def heartbeat_interval + options[:heartbeat_interval] || DEFAULT_HEARTBEAT_INTERVAL + end + # @deprecated def_delegators :server, :last_scan - # The compressor is determined during the handshake, so it must be an attribute - # of the connection. + # The compressor is determined during the handshake, so it must be an + # attribute of the connection. # # @deprecated def_delegators :connection, :compressor # @return [ Monitoring ] monitoring The monitoring. attr_reader :monitoring - # Get the refresh interval for the server. This will be defined via an - # option or will default to 10. - # - # @return [ Float ] The heartbeat interval, in seconds. - # - # @since 2.0.0 - # @deprecated - def heartbeat_frequency - server.cluster.heartbeat_interval - end - # Runs the server monitor. Refreshing happens on a separate thread per # server. # # @example Run the monitor. # monitor.run @@ -121,56 +118,74 @@ # @return [ Thread ] The thread the monitor runs on. # # @since 2.0.0 def do_work scan! - server.scan_semaphore.wait(server.cluster.heartbeat_interval) + delta = @next_wanted_scan - Time.now + if delta > 0 + server.scan_semaphore.wait(delta) + end end - # Stop the background thread and wait for to terminate for a reasonable - # amount of time. + # Stop the background thread and wait for it to terminate for a + # reasonable amount of time. # # @return [ true | false ] Whether the thread was terminated. # # @api public for backwards compatibility only def stop! # Forward super's return value super.tap do # Important: disconnect should happen after the background thread - # terminated. - connection.disconnect! + # terminates. + connection&.disconnect! end end # Perform a check of the server with throttling, and update # the server's description and average round trip time. # - # If the server was checked less than MIN_SCAN_FREQUENCY seconds - # ago, sleep until MIN_SCAN_FREQUENCY seconds have passed since the last + # If the server was checked less than MIN_SCAN_INTERVAL seconds + # ago, sleep until MIN_SCAN_INTERVAL seconds have passed since the last # check. Then perform the check which involves running isMaster # on the server being monitored and updating the server description # as a result. # - # @note If the system clock is set to a time in the past, this method - # can sleep for a very long time. + # @note If the system clock moves backwards, this method can sleep + # for a very long time. # # @note The return value of this method is deprecated. In version 3.0.0 # this method will not have a return value. # - # @example Run a scan. - # monitor.scan! - # # @return [ Description ] The updated description. # # @since 2.0.0 def scan! - throttle_scan_frequency! - result = ismaster - new_description = Description.new(server.address, result, - server.round_trip_time_averager.average_round_trip_time) - server.cluster.run_sdam_flow(server.description, new_description) - server.description + # Ordinarily the background thread would invoke this method. + # But it is also possible to invoke scan! directly on a monitor. + # Allow only one scan to be performed at a time. + @mutex.synchronize do + throttle_scan_frequency! + + result = do_scan + + old_description = server.description + + new_description = Description.new(server.address, result, + server.round_trip_time_averager.average_round_trip_time) + + server.cluster.run_sdam_flow(server.description, new_description) + + server.description.tap do |new_description| + if new_description.unknown? && !old_description.unknown? + @next_earliest_scan = @next_wanted_scan = Time.now + else + @next_earliest_scan = Time.now + MIN_SCAN_INTERVAL + @next_wanted_scan = Time.now + heartbeat_interval + end + end + end end # Restarts the server monitor unless the current thread is alive. # # @example Restart the monitor. @@ -191,57 +206,86 @@ def pre_stop server.scan_semaphore.signal end - def ismaster - @mutex.synchronize do + def do_scan + if monitoring.monitoring? + monitoring.started( + Monitoring::SERVER_HEARTBEAT, + Monitoring::Event::ServerHeartbeatStarted.new(server.address) + ) + end + + # The duration we publish in heartbeat succeeded/failed events is + # the time spent on the entire heartbeat. This could include time + # to connect the socket (including TLS handshake), not just time + # spent on ismaster call itself. + # The spec at https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring-monitoring.rst + # requires that the duration exposed here start from "sending the + # message" (ismaster). This requirement does not make sense if, + # for example, we were never able to connect to the server at all + # and thus ismaster was never sent. + start_time = Time.now + + begin + result = ismaster + rescue => exc + log_warn("Error running ismaster on #{server.address}: #{exc.class}: #{exc}:\n#{exc.backtrace[0..5].join("\n")}") if monitoring.monitoring? - monitoring.started( + monitoring.failed( Monitoring::SERVER_HEARTBEAT, - Monitoring::Event::ServerHeartbeatStarted.new(server.address) + Monitoring::Event::ServerHeartbeatFailed.new(server.address, Time.now-start_time, exc) ) end - - result, exc, rtt, average_rtt = server.round_trip_time_averager.measure do - connection.ismaster + result = {} + else + if monitoring.monitoring? + monitoring.succeeded( + Monitoring::SERVER_HEARTBEAT, + Monitoring::Event::ServerHeartbeatSucceeded.new(server.address, Time.now-start_time) + ) end - if exc - log_debug("Error running ismaster on #{server.address}: #{exc.class}: #{exc}:\n#{exc.backtrace[0..5].join("\n")}") - if monitoring.monitoring? - monitoring.failed( - Monitoring::SERVER_HEARTBEAT, - Monitoring::Event::ServerHeartbeatFailed.new(server.address, rtt, exc) - ) + end + result + end + + def ismaster + if @connection && @connection.pid != Process.pid + log_warn("Detected PID change - Mongo client should have been reconnected (old pid #{@connection.pid}, new pid #{Process.pid}") + @connection.disconnect! + @connection = nil + end + + if @connection + result = server.round_trip_time_averager.measure do + begin + message = @connection.dispatch_bytes(Monitor::Connection::ISMASTER_BYTES) + message.documents.first + rescue Mongo::Error + @connection.disconnect! + @connection = nil + raise end - result = {} - else - if monitoring.monitoring? - monitoring.succeeded( - Monitoring::SERVER_HEARTBEAT, - Monitoring::Event::ServerHeartbeatSucceeded.new(server.address, rtt) - ) - end end - result + else + connection = Connection.new(server.address, options) + connection.connect! + result = server.round_trip_time_averager.measure do + connection.handshake! + end + @connection = connection end + result end # @note If the system clock is set to a time in the past, this method # can sleep for a very long time. def throttle_scan_frequency! - # Normally server.last_scan indicates when the previous scan - # completed, but if scan! is manually invoked repeatedly then - # server.last_scan won't be updated and multiple scans with no - # cooldown can be obtained. Guard against repeated direct scan! - # invocation also. - last_time = [server.last_scan, @scan_started_at].compact.max - if last_time - difference = (Time.now - last_time) - throttle_time = (MIN_SCAN_FREQUENCY - difference) - sleep(throttle_time) if throttle_time > 0 + delta = @next_earliest_scan - Time.now + if delta > 0 + sleep(delta) end - @scan_started_at = Time.now end end end end