lib/contrast/agent/thread_watcher.rb in contrast-agent-6.0.0 vs lib/contrast/agent/thread_watcher.rb in contrast-agent-6.1.0
- old
+ new
@@ -3,116 +3,100 @@
require 'contrast/components/logger'
require 'contrast/agent/service_heartbeat'
require 'contrast/api/communication/messaging_queue'
require 'contrast/agent/reporting/report'
-require 'contrast/agent/telemetry/telemetry'
+require 'contrast/agent/reporting/reporter_heartbeat'
+require 'contrast/agent/telemetry/base'
module Contrast
module Agent
# This class used to ensure that our worker threads are running in multi-process environments
- #
- # @attr_reader heapdump_util [Contrast::Utils::HeapDumpUtil]
- # @attr_reader heartbeat [Contrast::Agent::ServiceHeartbeat]
- # @attr_reader messaging_queue [Contrast::Api::Communication::MessagingQueue]
class ThreadWatcher
include Contrast::Components::Logger::InstanceMethods
- attr_reader :heapdump_util, :heartbeat, :messaging_queue
+ # @return [Contrast::Utils::HeapDumpUtil]
+ attr_reader :heapdump_util
+ # @return [Contrast::Agent::ServiceHeartbeat, nil]
+ attr_reader :heartbeat
+ # @return [Contrast::Api::Communication::MessagingQueue, nil]
+ attr_reader :messaging_queue
+ # @return [Contrast::Agent::Reporter]
+ attr_reader :reporter
+ # @return [Contrast::Agent::ReporterHeartbeat]
+ attr_reader :reporter_heartbeat
def initialize
@pids = {}
@heapdump_util = Contrast::Utils::HeapDumpUtil.new
- @heartbeat = Contrast::Agent::ServiceHeartbeat.new
- @messaging_queue = Contrast::Api::Communication::MessagingQueue.new
- @telemetry = Contrast::Agent::Telemetry.new if Contrast::Agent::Telemetry.enabled?
- @reporter = Contrast::Agent::Reporter.new if Contrast::Agent::Reporter.enabled?
+ unless ::Contrast::CONTRAST_SERVICE.unnecessary?
+ @heartbeat = Contrast::Agent::ServiceHeartbeat.new
+ @messaging_queue = Contrast::Api::Communication::MessagingQueue.new
+ end
+ if Contrast::Agent::Reporter.enabled?
+ @reporter = Contrast::Agent::Reporter.new
+ @reporter_heartbeat = Contrast::Agent::ReporterHeartbeat.new
+ end
+ @telemetry = Contrast::Agent::Telemetry::Base.new if Contrast::Agent::Telemetry::Base.enabled?
end
+ # @return [Hash, nil] map of process to thread startup status
def startup!
return unless ::Contrast::AGENT.enabled?
- telemetry_status = telemetry_thread_init
- heartbeat_status = heartbeat_thread_init
- messaging_status = messaging_thread_init
- reporter_status = reporter_thread_init
-
logger.debug('ThreadWatcher started threads')
-
+ heartbeat_status = init_thread(heartbeat)
+ messaging_status = init_thread(messaging_queue)
@pids[Process.pid] = messaging_status && heartbeat_status
- return @pids unless Contrast::Agent::Telemetry.enabled?
-
- @pids[Process.pid] = @pids[Process.pid] && telemetry_status
+ if Contrast::Agent::Telemetry::Base.enabled?
+ telemetry_status = init_thread(telemetry_queue)
+ @pids[Process.pid] = @pids[Process.pid] && telemetry_status
+ end
return @pids unless Contrast::Agent::Reporter.enabled?
- @pids[Process.pid] = @pids[Process.pid] && reporter_status
+ reporter_status = init_thread(reporter)
+ reporter_heartbeat_status = init_thread(reporter_heartbeat)
+ @pids[Process.pid] = @pids[Process.pid] && reporter_status && reporter_heartbeat_status
+ @pids
end
def ensure_running?
return if @pids[Process.pid] == true
logger.debug('ThreadWatcher - threads not running')
startup!
end
def shutdown!
- heartbeat.stop!
- messaging_queue.stop!
- heapdump_util.stop!
+ heartbeat&.stop!
+ messaging_queue&.stop!
+ heapdump_util&.stop!
telemetry_queue&.stop!
reporter&.stop!
+ reporter_heartbeat&.stop!
end
- def heartbeat_thread_init
- unless heartbeat.running?
- logger.debug('Attempting to start heartbeat thread')
- heartbeat.start_thread!
- end
- heartbeat_result = heartbeat.running?
- logger.debug('Heartbeat thread status', alive: heartbeat_result)
- heartbeat_result
- end
-
- def telemetry_thread_init
- @telemetry.start_thread! if @telemetry&.attempt_to_start?
- telemetry_result = @telemetry&.running?
- logger.debug('Telemetry thread status', alive: telemetry_result)
- telemetry_result
- end
-
- def messaging_thread_init
- unless messaging_queue.running?
- logger.debug('Attempting to start messaging queue thread')
- messaging_queue.start_thread!
- end
- messaging_result = messaging_queue.running?
- logger.debug('Messaging thread status', alive: messaging_result)
- messaging_result
- end
-
- def reporter_thread_init
- @reporter.start_thread! if @reporter&.attempt_to_start?
- unless @reporter&.running?
- logger.debug('Attempting to start reporter thread')
- @reporter&.start_thread!
- end
- reporter_result = @reporter&.running?
- logger.debug('Reporter thread status', alive: reporter_result)
- reporter_result
- end
-
- # @return [Contrast::Agent::Telemetry]
+ # @return [Contrast::Agent::Telemetry::Base]
def telemetry_queue
- return if @telemetry.nil?
-
@telemetry
end
- # @return [Contrast::Agent::Reporter]
- def reporter
- return if @reporter.nil?
+ private
- @reporter
+ # Start the thread governed by the given watcher
+ #
+ # @param watcher [Contrast::Agent::ThreadWatcher]
+ # @return [Boolean] if the watched thread started successfully
+ def init_thread watcher
+ return unless watcher&.attempt_to_start?
+
+ unless watcher.running?
+ logger.debug('Attempting to start thread', type: watcher.to_s)
+ watcher.start_thread!
+ end
+ result = watcher.running?
+ logger.debug('Thread status', type: watcher.to_s, alive: result)
+ result
end
end
end
end