# frozen_string_literal: true # encoding: utf-8 # Copyright (C) 2015-2020 MongoDB Inc. # # Licensed under the Apache License, Version 2.0 (the 'License'); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an 'AS IS' BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. module Mongo # The class defines behavior for the performance monitoring API. # # @since 2.1.0 class Monitoring include Id # The command topic. # # @since 2.1.0 COMMAND = 'Command'.freeze # The connection pool topic. # # @since 2.9.0 CONNECTION_POOL = 'ConnectionPool'.freeze # Server closed topic. # # @since 2.4.0 SERVER_CLOSED = 'ServerClosed'.freeze # Server description changed topic. # # @since 2.4.0 SERVER_DESCRIPTION_CHANGED = 'ServerDescriptionChanged'.freeze # Server opening topic. # # @since 2.4.0 SERVER_OPENING = 'ServerOpening'.freeze # Topology changed topic. # # @since 2.4.0 TOPOLOGY_CHANGED = 'TopologyChanged'.freeze # Topology closed topic. # # @since 2.4.0 TOPOLOGY_CLOSED = 'TopologyClosed'.freeze # Topology opening topic. # # @since 2.4.0 TOPOLOGY_OPENING = 'TopologyOpening'.freeze # Server heartbeat started topic. # # @since 2.7.0 SERVER_HEARTBEAT = 'ServerHeartbeat'.freeze # Used for generating unique operation ids to link events together. # # @example Get the next operation id. # Monitoring.next_operation_id # # @return [ Integer ] The next operation id. # # @since 2.1.0 def self.next_operation_id self.next_id end # Contains subscription methods common between monitoring and # global event subscriptions. # # @since 2.6.0 module Subscribable # Subscribe a listener to an event topic. # # @note It is possible to subscribe the same listener to the same topic # multiple times, in which case the listener will be invoked as many # times as it is subscribed and to unsubscribe it the same number # of unsubscribe calls will be needed. # # @example Subscribe to the topic. # monitoring.subscribe(QUERY, subscriber) # # @example Subscribe to the topic globally. # Monitoring::Global.subscribe(QUERY, subscriber) # # @param [ String ] topic The event topic. # @param [ Object ] subscriber The subscriber to handle the event. # # @since 2.1.0 def subscribe(topic, subscriber) subscribers_for(topic).push(subscriber) end # Unsubscribe a listener from an event topic. # # If the listener was subscribed to the event topic multiple times, # this call removes a single subscription. # # If the listener was not subscribed to the topic, this operation # is a no-op and no exceptions are raised. # # @note Global subscriber registry is separate from per-client # subscriber registry. The same subscriber can be subscribed to # events from a particular client as well as globally; unsubscribing # globally will not unsubscribe that subscriber from the client # it was explicitly subscribed to. # # @note Currently the list of global subscribers is copied into # a client whenever the client is created. Thus unsubscribing a # subscriber globally has no effect for existing clients - they will # continue sending events to the unsubscribed subscriber. # # @example Unsubscribe from the topic. # monitoring.unsubscribe(QUERY, subscriber) # # @example Unsubscribe from the topic globally. # Mongo::Monitoring::Global.unsubscribe(QUERY, subscriber) # # @param [ String ] topic The event topic. # @param [ Object ] subscriber The subscriber to be unsubscribed. # # @since 2.6.0 def unsubscribe(topic, subscriber) subs = subscribers_for(topic) index = subs.index(subscriber) if index subs.delete_at(index) end end # Get all the subscribers. # # @example Get all the subscribers. # monitoring.subscribers # # @example Get all the global subscribers. # Mongo::Monitoring::Global.subscribers # # @return [ Hash ] The subscribers. # # @since 2.1.0 def subscribers @subscribers ||= {} end # Determine if there are any subscribers for a particular event. # # @example Are there subscribers? # monitoring.subscribers?(COMMAND) # # @example Are there global subscribers? # Mongo::Monitoring::Global.subscribers?(COMMAND) # # @param [ String ] topic The event topic. # # @return [ true, false ] If there are subscribers for the topic. # # @since 2.1.0 def subscribers?(topic) !subscribers_for(topic).empty? end private def subscribers_for(topic) subscribers[topic] ||= [] end end # Allows subscribing to events for all Mongo clients. # # @note Global subscriptions must be established prior to creating # clients. When a client is constructed it copies subscribers from # the Global module; subsequent subscriptions or unsubscriptions # on the Global module have no effect on already created clients. # # @since 2.1.0 module Global extend Subscribable end include Subscribable # Initialize the monitoring. # # @example Create the new monitoring. # Monitoring.new(:monitoring => true) # # @param [ Hash ] options Options. Client constructor forwards its # options to Monitoring constructor, although Monitoring recognizes # only a subset of the options recognized by Client. # @option options [ true, false ] :monitoring If false is given, the # Monitoring instance is initialized without global monitoring event # subscribers and will not publish SDAM events. Command monitoring events # will still be published, and the driver will still perform SDAM and # monitor its cluster in order to perform server selection. Built-in # driver logging of SDAM events will be disabled because it is # implemented through SDAM event subscription. Client#subscribe will # succeed for all event types, but subscribers to SDAM events will # not be invoked. Values other than false result in default behavior # which is to perform normal SDAM event publication. # # @since 2.1.0 # @api private def initialize(options = {}) @options = options if options[:monitoring] != false Global.subscribers.each do |topic, subscribers| subscribers.each do |subscriber| subscribe(topic, subscriber) end end subscribe(COMMAND, CommandLogSubscriber.new(options)) # CMAP events are not logged by default because this will create # log entries for every operation performed by the driver. #subscribe(CONNECTION_POOL, CmapLogSubscriber.new(options)) subscribe(SERVER_OPENING, ServerOpeningLogSubscriber.new(options)) subscribe(SERVER_CLOSED, ServerClosedLogSubscriber.new(options)) subscribe(SERVER_DESCRIPTION_CHANGED, ServerDescriptionChangedLogSubscriber.new(options)) subscribe(TOPOLOGY_OPENING, TopologyOpeningLogSubscriber.new(options)) subscribe(TOPOLOGY_CHANGED, TopologyChangedLogSubscriber.new(options)) subscribe(TOPOLOGY_CLOSED, TopologyClosedLogSubscriber.new(options)) end end # @api private attr_reader :options # @api private def monitoring? options[:monitoring] != false end # Publish an event. # # This method is used for event types which only have a single event # in them. # # @param [ String ] topic The event topic. # @param [ Event ] event The event to publish. # # @since 2.9.0 def published(topic, event) subscribers_for(topic).each{ |subscriber| subscriber.published(event) } end # Publish a started event. # # This method is used for event types which have the started/succeeded/failed # events in them, such as command and heartbeat events. # # @example Publish a started event. # monitoring.started(COMMAND, event) # # @param [ String ] topic The event topic. # @param [ Event ] event The event to publish. # # @since 2.1.0 def started(topic, event) subscribers_for(topic).each{ |subscriber| subscriber.started(event) } end # Publish a succeeded event. # # This method is used for event types which have the started/succeeded/failed # events in them, such as command and heartbeat events. # # @example Publish a succeeded event. # monitoring.succeeded(COMMAND, event) # # @param [ String ] topic The event topic. # @param [ Event ] event The event to publish. # # @since 2.1.0 def succeeded(topic, event) subscribers_for(topic).each{ |subscriber| subscriber.succeeded(event) } end # Publish a failed event. # # This method is used for event types which have the started/succeeded/failed # events in them, such as command and heartbeat events. # # @example Publish a failed event. # monitoring.failed(COMMAND, event) # # @param [ String ] topic The event topic. # @param [ Event ] event The event to publish. # # @since 2.1.0 def failed(topic, event) subscribers_for(topic).each{ |subscriber| subscriber.failed(event) } end # @api private def publish_heartbeat(server, awaited: false) if monitoring? started_event = Event::ServerHeartbeatStarted.new( server.address, awaited: awaited) started(SERVER_HEARTBEAT, started_event) end # The duration we publish in heartbeat succeeded/failed events is # the time spent on the entire heartbeat. This could include time # to connect the socket (including TLS handshake), not just time # spent on hello call itself. # The spec at https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring-monitoring.rst # requires that the duration exposed here start from "sending the # message" (hello). This requirement does not make sense if, # for example, we were never able to connect to the server at all # and thus hello was never sent. start_time = Utils.monotonic_time begin result = yield rescue => exc if monitoring? event = Event::ServerHeartbeatFailed.new( server.address, Utils.monotonic_time - start_time, exc, awaited: awaited, started_event: started_event, ) failed(SERVER_HEARTBEAT, event) end raise else if monitoring? event = Event::ServerHeartbeatSucceeded.new( server.address, Utils.monotonic_time - start_time, awaited: awaited, started_event: started_event, ) succeeded(SERVER_HEARTBEAT, event) end result end end private def initialize_copy(original) @subscribers = {} original.subscribers.each do |k, v| @subscribers[k] = v.dup end end end end require 'mongo/monitoring/event' require 'mongo/monitoring/publishable' require 'mongo/monitoring/command_log_subscriber' require 'mongo/monitoring/cmap_log_subscriber' require 'mongo/monitoring/sdam_log_subscriber' require 'mongo/monitoring/server_description_changed_log_subscriber' require 'mongo/monitoring/server_closed_log_subscriber' require 'mongo/monitoring/server_opening_log_subscriber' require 'mongo/monitoring/topology_changed_log_subscriber' require 'mongo/monitoring/topology_opening_log_subscriber' require 'mongo/monitoring/topology_closed_log_subscriber' require 'mongo/monitoring/unified_sdam_log_subscriber'