lib/mongo/server/monitor.rb in mongo-2.12.4 vs lib/mongo/server/monitor.rb in mongo-2.13.0.beta1
- old
+ new
@@ -1,6 +1,6 @@
-# Copyright (C) 2014-2019 MongoDB Inc.
+# Copyright (C) 2014-2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
@@ -20,26 +20,27 @@
#
# Does all work in a background thread so as to not interfere with other
# operations performed by the driver.
#
# @since 2.0.0
+ # @api private
class Monitor
include Loggable
extend Forwardable
include Event::Publisher
include BackgroundThread
- # The default time for a server to refresh its status is 10 seconds.
+ # The default interval between server status refreshes is 10 seconds.
#
# @since 2.0.0
- HEARTBEAT_FREQUENCY = 10.freeze
+ DEFAULT_HEARTBEAT_INTERVAL = 10.freeze
# The minimum time between forced server scans. Is
# minHeartbeatFrequencyMS in the SDAM spec.
#
# @since 2.0.0
- MIN_SCAN_FREQUENCY = 0.5.freeze
+ MIN_SCAN_INTERVAL = 0.5.freeze
# The weighting factor (alpha) for calculating the average moving round trip time.
#
# @since 2.0.0
# @deprecated Will be removed in version 3.0.
@@ -57,10 +58,12 @@
# @param [ Monitoring ] monitoring The monitoring..
# @param [ Hash ] options The options.
#
# @option options [ Float ] :connect_timeout The timeout, in seconds, to
# use when establishing the monitoring connection.
+ # @option options [ Float ] :heartbeat_interval The interval between
+ # regular server checks.
# @option options [ Logger ] :logger A custom logger to use.
# @option options [ Float ] :socket_timeout The timeout, in seconds, to
# execute operations on the monitoring connection.
#
# @since 2.0.0
@@ -71,14 +74,12 @@
end
@server = server
@event_listeners = event_listeners
@monitoring = monitoring
@options = options.freeze
- # This is a Mongo::Server::Monitor::Connection
- @connection = Connection.new(server.address, options)
@mutex = Mutex.new
- @scan_started_at = nil
+ @next_earliest_scan = @next_wanted_scan = Time.now
end
# @return [ Server ] server The server that this monitor is monitoring.
# @api private
attr_reader :server
@@ -87,33 +88,29 @@
attr_reader :connection
# @return [ Hash ] options The server options.
attr_reader :options
+ # The interval between regular server checks.
+ #
+ # @return [ Float ] The heartbeat interval, in seconds.
+ def heartbeat_interval
+ options[:heartbeat_interval] || DEFAULT_HEARTBEAT_INTERVAL
+ end
+
# @deprecated
def_delegators :server, :last_scan
- # The compressor is determined during the handshake, so it must be an attribute
- # of the connection.
+ # The compressor is determined during the handshake, so it must be an
+ # attribute of the connection.
#
# @deprecated
def_delegators :connection, :compressor
# @return [ Monitoring ] monitoring The monitoring.
attr_reader :monitoring
- # Get the refresh interval for the server. This will be defined via an
- # option or will default to 10.
- #
- # @return [ Float ] The heartbeat interval, in seconds.
- #
- # @since 2.0.0
- # @deprecated
- def heartbeat_frequency
- server.cluster.heartbeat_interval
- end
-
# Runs the server monitor. Refreshing happens on a separate thread per
# server.
#
# @example Run the monitor.
# monitor.run
@@ -121,56 +118,74 @@
# @return [ Thread ] The thread the monitor runs on.
#
# @since 2.0.0
def do_work
scan!
- server.scan_semaphore.wait(server.cluster.heartbeat_interval)
+ delta = @next_wanted_scan - Time.now
+ if delta > 0
+ server.scan_semaphore.wait(delta)
+ end
end
- # Stop the background thread and wait for to terminate for a reasonable
- # amount of time.
+ # Stop the background thread and wait for it to terminate for a
+ # reasonable amount of time.
#
# @return [ true | false ] Whether the thread was terminated.
#
# @api public for backwards compatibility only
def stop!
# Forward super's return value
super.tap do
# Important: disconnect should happen after the background thread
- # terminated.
- connection.disconnect!
+ # terminates.
+ connection&.disconnect!
end
end
# Perform a check of the server with throttling, and update
# the server's description and average round trip time.
#
- # If the server was checked less than MIN_SCAN_FREQUENCY seconds
- # ago, sleep until MIN_SCAN_FREQUENCY seconds have passed since the last
+ # If the server was checked less than MIN_SCAN_INTERVAL seconds
+ # ago, sleep until MIN_SCAN_INTERVAL seconds have passed since the last
# check. Then perform the check which involves running isMaster
# on the server being monitored and updating the server description
# as a result.
#
- # @note If the system clock is set to a time in the past, this method
- # can sleep for a very long time.
+ # @note If the system clock moves backwards, this method can sleep
+ # for a very long time.
#
# @note The return value of this method is deprecated. In version 3.0.0
# this method will not have a return value.
#
- # @example Run a scan.
- # monitor.scan!
- #
# @return [ Description ] The updated description.
#
# @since 2.0.0
def scan!
- throttle_scan_frequency!
- result = ismaster
- new_description = Description.new(server.address, result,
- server.round_trip_time_averager.average_round_trip_time)
- server.cluster.run_sdam_flow(server.description, new_description)
- server.description
+ # Ordinarily the background thread would invoke this method.
+ # But it is also possible to invoke scan! directly on a monitor.
+ # Allow only one scan to be performed at a time.
+ @mutex.synchronize do
+ throttle_scan_frequency!
+
+ result = do_scan
+
+ old_description = server.description
+
+ new_description = Description.new(server.address, result,
+ server.round_trip_time_averager.average_round_trip_time)
+
+ server.cluster.run_sdam_flow(server.description, new_description)
+
+ server.description.tap do |new_description|
+ if new_description.unknown? && !old_description.unknown?
+ @next_earliest_scan = @next_wanted_scan = Time.now
+ else
+ @next_earliest_scan = Time.now + MIN_SCAN_INTERVAL
+ @next_wanted_scan = Time.now + heartbeat_interval
+ end
+ end
+ end
end
# Restarts the server monitor unless the current thread is alive.
#
# @example Restart the monitor.
@@ -191,57 +206,86 @@
def pre_stop
server.scan_semaphore.signal
end
- def ismaster
- @mutex.synchronize do
+ def do_scan
+ if monitoring.monitoring?
+ monitoring.started(
+ Monitoring::SERVER_HEARTBEAT,
+ Monitoring::Event::ServerHeartbeatStarted.new(server.address)
+ )
+ end
+
+ # The duration we publish in heartbeat succeeded/failed events is
+ # the time spent on the entire heartbeat. This could include time
+ # to connect the socket (including TLS handshake), not just time
+ # spent on ismaster call itself.
+ # The spec at https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring-monitoring.rst
+ # requires that the duration exposed here start from "sending the
+ # message" (ismaster). This requirement does not make sense if,
+ # for example, we were never able to connect to the server at all
+ # and thus ismaster was never sent.
+ start_time = Time.now
+
+ begin
+ result = ismaster
+ rescue => exc
+ log_warn("Error running ismaster on #{server.address}: #{exc.class}: #{exc}:\n#{exc.backtrace[0..5].join("\n")}")
if monitoring.monitoring?
- monitoring.started(
+ monitoring.failed(
Monitoring::SERVER_HEARTBEAT,
- Monitoring::Event::ServerHeartbeatStarted.new(server.address)
+ Monitoring::Event::ServerHeartbeatFailed.new(server.address, Time.now-start_time, exc)
)
end
-
- result, exc, rtt, average_rtt = server.round_trip_time_averager.measure do
- connection.ismaster
+ result = {}
+ else
+ if monitoring.monitoring?
+ monitoring.succeeded(
+ Monitoring::SERVER_HEARTBEAT,
+ Monitoring::Event::ServerHeartbeatSucceeded.new(server.address, Time.now-start_time)
+ )
end
- if exc
- log_debug("Error running ismaster on #{server.address}: #{exc.class}: #{exc}:\n#{exc.backtrace[0..5].join("\n")}")
- if monitoring.monitoring?
- monitoring.failed(
- Monitoring::SERVER_HEARTBEAT,
- Monitoring::Event::ServerHeartbeatFailed.new(server.address, rtt, exc)
- )
+ end
+ result
+ end
+
+ def ismaster
+ if @connection && @connection.pid != Process.pid
+ log_warn("Detected PID change - Mongo client should have been reconnected (old pid #{@connection.pid}, new pid #{Process.pid}")
+ @connection.disconnect!
+ @connection = nil
+ end
+
+ if @connection
+ result = server.round_trip_time_averager.measure do
+ begin
+ message = @connection.dispatch_bytes(Monitor::Connection::ISMASTER_BYTES)
+ message.documents.first
+ rescue Mongo::Error
+ @connection.disconnect!
+ @connection = nil
+ raise
end
- result = {}
- else
- if monitoring.monitoring?
- monitoring.succeeded(
- Monitoring::SERVER_HEARTBEAT,
- Monitoring::Event::ServerHeartbeatSucceeded.new(server.address, rtt)
- )
- end
end
- result
+ else
+ connection = Connection.new(server.address, options)
+ connection.connect!
+ result = server.round_trip_time_averager.measure do
+ connection.handshake!
+ end
+ @connection = connection
end
+ result
end
# @note If the system clock is set to a time in the past, this method
# can sleep for a very long time.
def throttle_scan_frequency!
- # Normally server.last_scan indicates when the previous scan
- # completed, but if scan! is manually invoked repeatedly then
- # server.last_scan won't be updated and multiple scans with no
- # cooldown can be obtained. Guard against repeated direct scan!
- # invocation also.
- last_time = [server.last_scan, @scan_started_at].compact.max
- if last_time
- difference = (Time.now - last_time)
- throttle_time = (MIN_SCAN_FREQUENCY - difference)
- sleep(throttle_time) if throttle_time > 0
+ delta = @next_earliest_scan - Time.now
+ if delta > 0
+ sleep(delta)
end
- @scan_started_at = Time.now
end
end
end
end