# Copyright (c) 2020 Contrast Security, Inc. See https://www.contrastsecurity.com/enduser-terms-0317a for more details. # frozen_string_literal: true cs__scoped_require 'contrast/components/interface' module Contrast module Utils # This module keeps a background thread, initialized at startup, running. This thread is # responsible for pulling a message off the queue, sending that message to SpeedRacer, and # processing the response returned back from SpeedRacer. When we didn't know our config # values, we may have queued assess messages to send later. This thread is also # responsible for checking to see if we are now ready to send any of these assess messages # to SpeedRacer. module ServiceSenderUtil include Contrast::Components::Interface access_component :analysis, :app_context, :logging, :settings class << self # Push the given DTM into a queue to be sent to the Service. # # @param event [Contrast::Api::Dtm] One of the DTMs valid for the event field of # Contrast::Api::Dtm::Message def push_to_ready_queue event return unless event prep_queue ready_messages_queue.push event logger.debug('Enqueued event', event_id: event.__id__, event_type: event.cs__class.name, queue: ready_messages_queue.__id__, length: ready_messages_queue.length) end # Bypass the queue and send the message directly to SpeedRacer. This is the only way # to receive the response object back from SpeedRacer. An invoking method that needs # to process the response object with custom logic should use this method. # # @param event [Contrast::Api::Dtm] One of the DTMs valid for the event # field of Contrast::Api::Dtm::Message # @return [Array] the response from SpeedRacer def send_event_immediately event client.send_to_speedracer(event) if event end def sending_thread return @_sender_thread if @_sender_thread # in case we got here before a message attempted to be sent, let's make sure these are initialized @_sender_thread ||= Contrast::Agent::Thread.new do loop do check_assess_queue # if ready messages queue is empty, calling thread is suspended until a message is pushed onto queue event = ready_messages_queue.pop begin logger.debug('Dequeued event', event_id: event.__id__, event_type: event.cs__class.name, queue: ready_messages_queue.__id__, length: ready_messages_queue.length) response = client.send_to_speedracer(event) Contrast::Utils::ServiceResponseUtil.process_response(response) if response rescue StandardError => e logger.error('Could not send message to service from service sender thread.', e) end end end logger.debug('Started background sending thread.') end alias_method :start, :sending_thread def stop Thread.kill(@_sender_thread) if @_sender_thread&.alive? end def add_to_assess_messages msg assess_messages! << msg end # Return true if the agent has connected with the service def connection_established? client.connection_established? end private # TODO: RUBY-920 # If we've been forked, then we need to make sure the sender is set and # alive. This is a temporary workaround to address issues in Passenger # around forking and will be addressed next sprint. def prep_queue return if @_sender_thread&.alive? @_sender_thread = nil start end def ready_messages_queue @_ready_messages_queue ||= begin tmp = Queue.new logger.debug('Creating new ready_messages_queue.', queue: tmp.__id__, length: tmp.length) tmp end end def assess_messages! unless @_assess_messages logger.debug('Creating new assess messages queue by force.') @_assess_messages = nil end assess_messages end # This list holds messages containing assess findings. We may not # know if the agent is running in assess, so until we do then we # must keep the messages stored here. Once we do know, we'll set this # to `false` and rely on the standard ready_messages_queue def assess_messages if @_assess_messages.nil? logger.debug('Creating new assess messages queue.') @_assess_messages = [] end @_assess_messages end def client @_client ||= Contrast::Agent::SocketClient.new end def check_assess_queue return unless assess_messages return unless APP_CONTEXT.ready? if ASSESS.enabled? assess_messages.each do |msg| update_queued_finding msg ready_messages_queue.push msg # this message is ready to be sent to SR end end @_assess_messages = false # clear out queue, we no longer need it end # When we queued up findings, we didn't have access to a configuration # object. As such, we have to make decisions now for those findings. def update_queued_finding msg # Findings only exist in activity messages return unless msg.is_a?(Contrast::Api::Dtm::Activity) # So set their tags msg.finding_tags = Contrast::Utils::StringUtils.force_utf8(ASSESS.tags) # and see if they're even enabled check = msg.findings.dup check.each do |finding| msg.findings.delete(finding) if ASSESS.rule_disabled?(finding.rule_id) end msg.findings.each do |finding| finding.session_id = SETTINGS.session_id end end end end end end