# Copyright (c) 2020 Contrast Security, Inc. See https://www.contrastsecurity.com/enduser-terms-0317a for more details. # frozen_string_literal: true cs__scoped_require 'contrast/components/interface' module Contrast module Utils # This module keeps a background thread, initialized at startup, running. This thread is # responsible for pulling a message off the queue, sending that message to SpeedRacer, and # processing the response returned back from SpeedRacer. When we didn't know our config # values, we may have queued assess messages to send later. This thread is also # responsible for checking to see if we are now ready to send any of these assess messages # to SpeedRacer. module ServiceSenderUtil include Contrast::Components::Interface access_component :contrast_service, :logging, :analysis, :agent class << self attr_reader :assess_messages, :ready_messages_queue # Push the given DTM into a queue to be sent to the Service. # # @param event [Contrast::Api::Dtm] One of the DTMs valid for the event field of # Contrast::Api::Dtm::Message def push_to_ready_queue event return unless event # Since a message may try to be pushed before sending_thread starts up, we need to initialize # the queue on first message sent @ready_messages_queue ||= Queue.new ready_messages_queue.push event logger.debug("Enqueued the #{ event.cs__class.name } event #{ event.__id__ }.") # TODO: RUBY-794 make Type & ID structured for parsing) end def sending_thread # in case we got here before a message attempted to be sent, let's make sure these are initialized @ready_messages_queue ||= Queue.new @assess_messages ||= [] @_sender_thread ||= Contrast::Agent::Thread.new do loop do check_assess_queue if assess_messages # don't bother checking if assess queue is already cleared # if ready messages queue is empty, calling thread is suspended until a message is pushed onto queue event = ready_messages_queue.pop begin logger.debug("Dequeued the #{ event.cs__class.name } event #{ event.__id__ }.") # TODO: RUBY-794 make Type & ID structured for parsing) response = Contrast::Agent::FeatureState.instance.client.send_to_speedracer event Contrast::Utils::ServiceResponseUtil.process_response(response) if response rescue StandardError => e logger.error(e, 'Could not send message to service from service sender thread.') end end end logger.debug(nil, 'Started background sending thread.') end alias_method :start, :sending_thread def stop Thread.kill(@_sender_thread) if @_sender_thread&.alive? end def add_to_assess_messages msg # This list holds messages containing assess findings. We currently # do not know if the agent is running in assess, so until we do then # we must keep the messages stored here. @assess_messages ||= [] assess_messages << msg end private def check_assess_queue return unless AGENT.ready? if ASSESS.enabled? assess_messages.each do |msg| update_queued_finding msg ready_messages_queue.push msg # this message is ready to be sent to SR end end @assess_messages = nil # clear out queue, we no longer need it end # When we queued up findings, we didn't have access to a configuration # object. As such, we have to make decisions now for those findings. def update_queued_finding msg # Findings only exist in activity messages return unless msg.is_a?(Contrast::Api::Dtm::Activity) # So set their tags msg.finding_tags = Contrast::Utils::StringUtils.force_utf8(ASSESS.tags) # and see if they're even enabled check = msg.findings.dup check.each do |finding| msg.findings.delete(finding) if ASSESS.rule_disabled?(finding.rule_id) end msg.findings.each do |finding| finding.session_id = Contrast::Agent::FeatureState.instance.current_session_id end end end end end end