# Copyright (c) 2022 Contrast Security, Inc. See https://www.contrastsecurity.com/enduser-terms-0317a for more details. # frozen_string_literal: true require 'contrast/components/logger' require 'contrast/agent/assess/policy/trigger_method' require 'contrast/agent/worker_thread' require 'contrast/agent/reporting/reporting_utilities/audit' require 'contrast/api/dtm.pb' module Contrast module Api module Communication # Top level gateway to messaging with speedracer class MessagingQueue < Contrast::Agent::WorkerThread include Contrast::Components::Logger::InstanceMethods # @return [Contrast::Api::Communication::Speedracer, nil] attr_reader :speedracer def initialize return if ::Contrast::CONTRAST_SERVICE.unnecessary? @speedracer = Contrast::Api::Communication::Speedracer.new super end # Use this to bypass the messaging queue and leave response processing to the caller. We use this method for # those operations which are blocking, meaning they must complete for the Agent to continue operations. These # types of events include initial settings/ setup on startup and analysis required to complete before handing # execution from our pre-filter operations to the application code (i.e. Protect's input analysis). # # @param event [Contrast::Api::Dtm] One of the DTMs valid for the event field of # Contrast::Api::Dtm::Message|Contrast::Api::Dtm::Activity # @return [Contrast::Api::Settings::AgentSettings,nil] def send_event_immediately event if ::Contrast::AGENT.disabled? logger.warn('Attempted to send event immediately with Agent disabled', caller: caller, event: event) return end return if ::Contrast::CONTRAST_SERVICE.unnecessary? speedracer.return_response(event) end # A simple Queue used to hold messages that are ready to be sent to SpeedRacer but for which we do not need an # immediate response # # @return [Queue, nil] def queue return if ::Contrast::CONTRAST_SERVICE.unnecessary? @_queue ||= Queue.new end # Use this to add a message to the queue and process the response internally. We use this method for those # operations which are non-blocking, meaning they don't need to complete for the Agent to continue operations. # These types of events include any post-request messaging that occurs after execution is returned from the # application to our post-filter operations as well as those that don't alter the application's execution (i.e. # Assess' vulnerability reporting). # # @param event [Contrast::Api::Dtm] One of the DTMs valid for the event field of # Contrast::Api::Dtm::Message|Contrast::Api::Dtm::Activity # @param force [Boolean] if we should always queue this event, even if the service isn't running, in case the # message may be ready before the service has initiated. usually for those events which happen in a separate # thread or which may occur during initialization. def send_event_eventually event, force: false if ::Contrast::AGENT.disabled? logger.warn('Attempted to queue event with Agent disabled', caller: caller, event: event) return end return if ::Contrast::CONTRAST_SERVICE.unnecessary? # If we're unable to start the Service, then we cannot afford to queue messages as this creates the potential # for a memory leak, especially if the thread responsible for de-queueing the messages is dead. return if !force && !(speedracer.status.connected? && running?) # If we're in direct communication mode, the only message we should queue is the heartbeat to keep the # service alive during Protect activities. All other messages should go through direct reporting. if ::Contrast::CONTRAST_SERVICE.use_agent_communication? return unless ::Contrast::PROTECT.enabled? return unless event.cs__is_a?(Contrast::Api::Dtm::Poll) end logger.debug('Enqueued event for sending', event_type: event.cs__class) queue << event if event end # Create the reporting thread, which will pull from the queue in order to send messages to SpeedRacer. If # SpeedRacer is not running and should be, meaning the Agent is configured to control it, then we will also # try to start that process. def start_thread! return if ::Contrast::CONTRAST_SERVICE.unnecessary? speedracer.ensure_startup! return if running? @_thread = Contrast::Agent::Thread.new do loop do event = queue.pop begin logger.debug('Dequeued event for sending', event_type: event.cs__class) speedracer.process_internally(event) rescue StandardError => e logger.error('Could not send message to service from messaging queue thread.', e) end end end logger.debug('Started background sending thread.') end # When the Agent shuts down, we terminate the message sending operations. This method clears, closes, and # destroys the queue. def delete_queue! @_queue&.clear @_queue&.close @_queue = nil end # When the Agent shuts down, we terminate the thread responsible for reporting. This method destroys that # thread and any data we would have sent with it. def stop! return unless running? super delete_queue! end end end end end