lib/contrast/agent/thread_watcher.rb in contrast-agent-6.0.0 vs lib/contrast/agent/thread_watcher.rb in contrast-agent-6.1.0

- old
+ new

@@ -3,116 +3,100 @@ require 'contrast/components/logger' require 'contrast/agent/service_heartbeat' require 'contrast/api/communication/messaging_queue' require 'contrast/agent/reporting/report' -require 'contrast/agent/telemetry/telemetry' +require 'contrast/agent/reporting/reporter_heartbeat' +require 'contrast/agent/telemetry/base' module Contrast module Agent # This class used to ensure that our worker threads are running in multi-process environments - # - # @attr_reader heapdump_util [Contrast::Utils::HeapDumpUtil] - # @attr_reader heartbeat [Contrast::Agent::ServiceHeartbeat] - # @attr_reader messaging_queue [Contrast::Api::Communication::MessagingQueue] class ThreadWatcher include Contrast::Components::Logger::InstanceMethods - attr_reader :heapdump_util, :heartbeat, :messaging_queue + # @return [Contrast::Utils::HeapDumpUtil] + attr_reader :heapdump_util + # @return [Contrast::Agent::ServiceHeartbeat, nil] + attr_reader :heartbeat + # @return [Contrast::Api::Communication::MessagingQueue, nil] + attr_reader :messaging_queue + # @return [Contrast::Agent::Reporter] + attr_reader :reporter + # @return [Contrast::Agent::ReporterHeartbeat] + attr_reader :reporter_heartbeat def initialize @pids = {} @heapdump_util = Contrast::Utils::HeapDumpUtil.new - @heartbeat = Contrast::Agent::ServiceHeartbeat.new - @messaging_queue = Contrast::Api::Communication::MessagingQueue.new - @telemetry = Contrast::Agent::Telemetry.new if Contrast::Agent::Telemetry.enabled? - @reporter = Contrast::Agent::Reporter.new if Contrast::Agent::Reporter.enabled? + unless ::Contrast::CONTRAST_SERVICE.unnecessary? + @heartbeat = Contrast::Agent::ServiceHeartbeat.new + @messaging_queue = Contrast::Api::Communication::MessagingQueue.new + end + if Contrast::Agent::Reporter.enabled? + @reporter = Contrast::Agent::Reporter.new + @reporter_heartbeat = Contrast::Agent::ReporterHeartbeat.new + end + @telemetry = Contrast::Agent::Telemetry::Base.new if Contrast::Agent::Telemetry::Base.enabled? end + # @return [Hash, nil] map of process to thread startup status def startup! return unless ::Contrast::AGENT.enabled? - telemetry_status = telemetry_thread_init - heartbeat_status = heartbeat_thread_init - messaging_status = messaging_thread_init - reporter_status = reporter_thread_init - logger.debug('ThreadWatcher started threads') - + heartbeat_status = init_thread(heartbeat) + messaging_status = init_thread(messaging_queue) @pids[Process.pid] = messaging_status && heartbeat_status - return @pids unless Contrast::Agent::Telemetry.enabled? - - @pids[Process.pid] = @pids[Process.pid] && telemetry_status + if Contrast::Agent::Telemetry::Base.enabled? + telemetry_status = init_thread(telemetry_queue) + @pids[Process.pid] = @pids[Process.pid] && telemetry_status + end return @pids unless Contrast::Agent::Reporter.enabled? - @pids[Process.pid] = @pids[Process.pid] && reporter_status + reporter_status = init_thread(reporter) + reporter_heartbeat_status = init_thread(reporter_heartbeat) + @pids[Process.pid] = @pids[Process.pid] && reporter_status && reporter_heartbeat_status + @pids end def ensure_running? return if @pids[Process.pid] == true logger.debug('ThreadWatcher - threads not running') startup! end def shutdown! - heartbeat.stop! - messaging_queue.stop! - heapdump_util.stop! + heartbeat&.stop! + messaging_queue&.stop! + heapdump_util&.stop! telemetry_queue&.stop! reporter&.stop! + reporter_heartbeat&.stop! end - def heartbeat_thread_init - unless heartbeat.running? - logger.debug('Attempting to start heartbeat thread') - heartbeat.start_thread! - end - heartbeat_result = heartbeat.running? - logger.debug('Heartbeat thread status', alive: heartbeat_result) - heartbeat_result - end - - def telemetry_thread_init - @telemetry.start_thread! if @telemetry&.attempt_to_start? - telemetry_result = @telemetry&.running? - logger.debug('Telemetry thread status', alive: telemetry_result) - telemetry_result - end - - def messaging_thread_init - unless messaging_queue.running? - logger.debug('Attempting to start messaging queue thread') - messaging_queue.start_thread! - end - messaging_result = messaging_queue.running? - logger.debug('Messaging thread status', alive: messaging_result) - messaging_result - end - - def reporter_thread_init - @reporter.start_thread! if @reporter&.attempt_to_start? - unless @reporter&.running? - logger.debug('Attempting to start reporter thread') - @reporter&.start_thread! - end - reporter_result = @reporter&.running? - logger.debug('Reporter thread status', alive: reporter_result) - reporter_result - end - - # @return [Contrast::Agent::Telemetry] + # @return [Contrast::Agent::Telemetry::Base] def telemetry_queue - return if @telemetry.nil? - @telemetry end - # @return [Contrast::Agent::Reporter] - def reporter - return if @reporter.nil? + private - @reporter + # Start the thread governed by the given watcher + # + # @param watcher [Contrast::Agent::ThreadWatcher] + # @return [Boolean] if the watched thread started successfully + def init_thread watcher + return unless watcher&.attempt_to_start? + + unless watcher.running? + logger.debug('Attempting to start thread', type: watcher.to_s) + watcher.start_thread! + end + result = watcher.running? + logger.debug('Thread status', type: watcher.to_s, alive: result) + result end end end end