# Copyright (c) 2022 Contrast Security, Inc. See https://www.contrastsecurity.com/enduser-terms-0317a for more details. # frozen_string_literal: true require 'contrast/components/logger' require 'contrast/agent/worker_thread' require 'contrast/agent/reporting/reporting_utilities/audit' module Contrast module Api module Communication # Top level gateway to messaging with speedracer class MessagingQueue < Contrast::Agent::WorkerThread include Contrast::Components::Logger::InstanceMethods attr_reader :speedracer def initialize @speedracer = Contrast::Api::Communication::Speedracer.new @audit = Contrast::Agent::Reporting::Audit.new if ::Contrast::API.request_audit_enable? super end # Use this to bypass the messaging queue and leave response processing to the caller. We use this method for # those operations which are blocking, meaning they must complete for the Agent to continue operations. These # types of events include initial settings/ setup on startup and analysis required to complete before handing # execution from our pre-filter operations to the application code (i.e. Protect's input analysis). # # @param event [Contrast::Api::Dtm] One of the DTMs valid for the event field of # Contrast::Api::Dtm::Message|Contrast::Api::Dtm::Activity # @return [Contrast::Api::Settings::AgentSettings,nil] def send_event_immediately event if ::Contrast::AGENT.disabled? logger.warn('Attempted to send event immediately with Agent disabled', caller: caller, event: event) return end response_data = speedracer.return_response(event) return response_data unless ::Contrast::API.request_audit_enable? @audit&.audit_event(event, response_data) response_data end # A simple Queue used to hold messages that are ready to be sent to SpeedRacer but for which we do not need an # immediate response # # @return [Queue] def queue @_queue ||= Queue.new end # Use this to add a message to the queue and process the response internally. We use this method for those # operations which are non-blocking, meaning they don't need to complete for the Agent to continue operations. # These types of events include any post-request messaging that occurs after execution is returned from the # application to our post-filter operations as well as those that don't alter the application's execution (i.e. # Assess' vulnerability reporting). # # @param event [Contrast::Api::Dtm] One of the DTMs valid for the event field of # Contrast::Api::Dtm::Message|Contrast::Api::Dtm::Activity def send_event_eventually event if ::Contrast::AGENT.disabled? logger.warn('Attempted to queue event with Agent disabled', caller: caller, event: event) return end logger.debug('Enqueued event for sending', event_type: event.cs__class) queue << event if event return unless ::Contrast::API.request_audit_enable? @audit&.audit_event(event) end # Create the reporting thread, which will pull from the queue in order to send messages to SpeedRacer. If # SpeedRacer is not running and should be, meaning the Agent is configured to control it, then we will also # try to start that process. def start_thread! speedracer.ensure_startup! return if running? @_thread = Contrast::Agent::Thread.new do loop do event = queue.pop begin logger.debug('Dequeued event for sending', event_type: event.cs__class) speedracer.process_internally(event) rescue StandardError => e logger.error('Could not send message to service from messaging queue thread.', e) end end end logger.debug('Started background sending thread.') end # When the Agent shuts down, we terminate the message sending operations. This method clears, closes, and # destroys the queue. def delete_queue! @_queue&.clear @_queue&.close @_queue = nil end # When the Agent shuts down, we terminate the thread responsible for reporting. This method destroys that # thread and any data we would have sent with it. def stop! return unless running? super delete_queue! end end end end end