# frozen_string_literal: true # encoding: utf-8 # Copyright (C) 2014-2020 MongoDB Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. module Mongo class Server # Responsible for periodically polling a server via hello commands to # keep the server's status up to date. # # Does all work in a background thread so as to not interfere with other # operations performed by the driver. # # @since 2.0.0 # @api private class Monitor include Loggable extend Forwardable include Event::Publisher include BackgroundThread # The default interval between server status refreshes is 10 seconds. # # @since 2.0.0 DEFAULT_HEARTBEAT_INTERVAL = 10.freeze # The minimum time between forced server scans. Is # minHeartbeatFrequencyMS in the SDAM spec. # # @since 2.0.0 MIN_SCAN_INTERVAL = 0.5.freeze # The weighting factor (alpha) for calculating the average moving round trip time. # # @since 2.0.0 # @deprecated Will be removed in version 3.0. RTT_WEIGHT_FACTOR = 0.2.freeze # Create the new server monitor. # # @example Create the server monitor. # Mongo::Server::Monitor.new(address, listeners, monitoring) # # @note Monitor must never be directly instantiated outside of a Server. # # @param [ Server ] server The server to monitor. # @param [ Event::Listeners ] event_listeners The event listeners. # @param [ Monitoring ] monitoring The monitoring.. # @param [ Hash ] options The options. # # @option options [ Float ] :connect_timeout The timeout, in seconds, to # use when establishing the monitoring connection. # @option options [ Float ] :heartbeat_interval The interval between # regular server checks. # @option options [ Logger ] :logger A custom logger to use. # @option options [ Mongo::Server::Monitor::AppMetadata ] :monitor_app_metadata # The metadata to use for regular monitoring connection. # @option options [ Mongo::Server::Monitor::AppMetadata ] :push_monitor_app_metadata # The metadata to use for push monitor's connection. # @option options [ Float ] :socket_timeout The timeout, in seconds, to # execute operations on the monitoring connection. # # @since 2.0.0 # @api private def initialize(server, event_listeners, monitoring, options = {}) unless monitoring.is_a?(Monitoring) raise ArgumentError, "Wrong monitoring type: #{monitoring.inspect}" end unless options[:app_metadata] raise ArgumentError, 'App metadata is required' end unless options[:push_monitor_app_metadata] raise ArgumentError, 'Push monitor app metadata is required' end @server = server @event_listeners = event_listeners @monitoring = monitoring @options = options.freeze @mutex = Mutex.new @sdam_mutex = Mutex.new @next_earliest_scan = @next_wanted_scan = Time.now @update_mutex = Mutex.new end # @return [ Server ] server The server that this monitor is monitoring. # @api private attr_reader :server # @return [ Mongo::Server::Monitor::Connection ] connection The connection to use. attr_reader :connection # @return [ Hash ] options The server options. attr_reader :options # The interval between regular server checks. # # @return [ Float ] The heartbeat interval, in seconds. def heartbeat_interval options[:heartbeat_interval] || DEFAULT_HEARTBEAT_INTERVAL end # @deprecated def_delegators :server, :last_scan # The compressor is determined during the handshake, so it must be an # attribute of the connection. # # @deprecated def_delegators :connection, :compressor # @return [ Monitoring ] monitoring The monitoring. attr_reader :monitoring # @return [ Server::PushMonitor | nil ] The push monitor, if one is being # used. def push_monitor @update_mutex.synchronize do @push_monitor end end # Perform a check of the server. # # @since 2.0.0 def do_work scan! # @next_wanted_scan may be updated by the push monitor. # However we need to check for termination flag so that the monitor # thread exits when requested. loop do delta = @next_wanted_scan - Time.now if delta > 0 signaled = server.scan_semaphore.wait(delta) if signaled || @stop_requested break end else break end end end # Stop the background thread and wait for it to terminate for a # reasonable amount of time. # # @return [ true | false ] Whether the thread was terminated. # # @api public for backwards compatibility only def stop! stop_push_monitor! # Forward super's return value super.tap do # Important: disconnect should happen after the background thread # terminates. connection&.disconnect! end end def create_push_monitor!(topology_version) @update_mutex.synchronize do if @push_monitor && !@push_monitor.running? @push_monitor = nil end @push_monitor ||= PushMonitor.new( self, topology_version, monitoring, **Utils.shallow_symbolize_keys(options.merge( socket_timeout: heartbeat_interval + connection.socket_timeout, app_metadata: options[:push_monitor_app_metadata], check_document: @connection.check_document )), ) end end def stop_push_monitor! @update_mutex.synchronize do if @push_monitor @push_monitor.stop! @push_monitor = nil end end end # Perform a check of the server with throttling, and update # the server's description and average round trip time. # # If the server was checked less than MIN_SCAN_INTERVAL seconds # ago, sleep until MIN_SCAN_INTERVAL seconds have passed since the last # check. Then perform the check which involves running hello # on the server being monitored and updating the server description # as a result. # # @note If the system clock moves backwards, this method can sleep # for a very long time. # # @note The return value of this method is deprecated. In version 3.0.0 # this method will not have a return value. # # @return [ Description ] The updated description. # # @since 2.0.0 def scan! # Ordinarily the background thread would invoke this method. # But it is also possible to invoke scan! directly on a monitor. # Allow only one scan to be performed at a time. @mutex.synchronize do throttle_scan_frequency! result = do_scan run_sdam_flow(result) end end def run_sdam_flow(result, awaited: false) @sdam_mutex.synchronize do old_description = server.description new_description = Description.new(server.address, result, average_round_trip_time: server.round_trip_time_averager.average_round_trip_time ) server.cluster.run_sdam_flow(server.description, new_description, awaited: awaited) server.description.tap do |new_description| unless awaited if new_description.unknown? && !old_description.unknown? @next_earliest_scan = @next_wanted_scan = Time.now else @next_earliest_scan = Time.now + MIN_SCAN_INTERVAL @next_wanted_scan = Time.now + heartbeat_interval end end end end end # Restarts the server monitor unless the current thread is alive. # # @example Restart the monitor. # monitor.restart! # # @return [ Thread ] The thread the monitor runs on. # # @since 2.1.0 def restart! if @thread && @thread.alive? @thread else run! end end private def pre_stop server.scan_semaphore.signal end def do_scan begin monitoring.publish_heartbeat(server) do check end rescue => exc msg = "Error checking #{server.address}" Utils.warn_bg_exception(msg, exc, logger: options[:logger], log_prefix: options[:log_prefix], bg_error_backtrace: options[:bg_error_backtrace], ) {} end end def check if @connection && @connection.pid != Process.pid log_warn("Detected PID change - Mongo client should have been reconnected (old pid #{@connection.pid}, new pid #{Process.pid}") @connection.disconnect! @connection = nil end if @connection result = server.round_trip_time_averager.measure do begin doc = @connection.check_document cmd = Protocol::Query.new( Database::ADMIN, Database::COMMAND, doc, :limit => -1 ) message = @connection.dispatch_bytes(cmd.serialize.to_s) message.documents.first rescue Mongo::Error @connection.disconnect! @connection = nil raise end end else connection = Connection.new(server.address, options) connection.connect! result = server.round_trip_time_averager.measure do connection.handshake! end @connection = connection if tv_doc = result['topologyVersion'] # Successful response, server 4.4+ create_push_monitor!(TopologyVersion.new(tv_doc)) push_monitor.run! else # Failed response or pre-4.4 server stop_push_monitor! end result end result end # @note If the system clock is set to a time in the past, this method # can sleep for a very long time. def throttle_scan_frequency! delta = @next_earliest_scan - Time.now if delta > 0 sleep(delta) end end end end end require 'mongo/server/monitor/connection' require 'mongo/server/monitor/app_metadata'