# Copyright (c) 2020 Contrast Security, Inc. See https://www.contrastsecurity.com/enduser-terms-0317a for more details. # frozen_string_literal: true require 'contrast/agent/at_exit_hook' require 'contrast/agent/protect/rule/base_service' require 'contrast/utils/stack_trace_utils' require 'contrast/utils/object_share' require 'contrast/components/interface' module Contrast module Agent module Protect module Rule # The Ruby implementation of the Protect Command Injection rule. class CmdInjection < Contrast::Agent::Protect::Rule::BaseService include Contrast::Components::Interface access_component :app_context, :logging NAME = 'cmd-injection' CHAINED_COMMAND_CHARS = /[;&|<>]/.cs__freeze def name NAME end def infilter context, classname, method, command return nil unless infilter?(context) ia_results = gather_ia_results(context) return nil if ia_results.empty? if APP_CONTEXT.in_new_process? logger.trace('Running cmd-injection infilter within new process - creating new context') context = Contrast::Agent::RequestContext.new(context.request.rack_request) Contrast::Agent::REQUEST_TRACKER.update_current_context(context) end result = find_attacker_with_results(context, command, ia_results, **{ classname: classname, method: method }) result ||= report_command_execution(context, command, **{ classname: classname, method: method }) return nil unless result append_to_activity(context, result) return unless blocked? raise Contrast::SecurityException.new( self, "Command Injection rule triggered. Call to #{ classname }.#{ method } blocked.") ensure # Kernel#exec replaces the current process and does not go through # at_exit hooks. Kernel#` runs as a subshell - messages appended # here do not seem to be present in the original process. Contrast::Agent::AtExitHook.on_exit if %i[exec `].include?(method.to_sym) end def build_attack_with_match context, input_analysis_result, result, candidate_string, **kwargs return result if mode == :NO_ACTION || mode == :PERMIT result ||= build_attack_result(context) update_successful_attack_response(context, input_analysis_result, result, candidate_string) append_sample(context, input_analysis_result, result, candidate_string, **kwargs) result end protected # Because results are not necessarily on the context across # processes; extract early and pass into the method def find_attacker_with_results context, potential_attack_string, ia_results, **kwargs logger.trace('Checking vectors for attacks', rule: name, input: potential_attack_string) result = super(context, potential_attack_string, ia_results, **kwargs) if result.nil? && potential_attack_string result = find_probable_attacker( context, potential_attack_string, ia_results, **kwargs) end result end # Build a subclass of the RaspRuleSample using the query string and the # evaluation def build_sample context, input_analysis_result, candidate_string, **_kwargs sample = build_base_sample(context, input_analysis_result) sample.cmdi = Contrast::Api::Dtm::CmdInjectionDetails.new command = candidate_string || input_analysis_result.value command = Contrast::Utils::StringUtils.protobuf_safe_string(command) sample.cmdi.command = command # This is a special case where the user input is UNKNOWN_USER_INPUT but # we want to send the attack value if input_analysis_result.nil? ui = Contrast::Api::Dtm::UserInput.new ui.input_type = :UNKNOWN ui.value = command sample.user_input = ui end sample end private def report_command_execution context, command, **kwargs return unless report_any_command_execution? return nil if protect_excluded_by_code? build_attack_with_match(context, nil, nil, command, **kwargs) end def find_probable_attacker context, potential_attack_string, ia_results, **kwargs result = nil if chained_command?(potential_attack_string) # this is probably an attack most_likely = nil ia_results.each do |input_analysis_result| next unless chained_command?(input_analysis_result.value) most_likely = input_analysis_result break end end return result unless most_likely result ||= build_attack_with_match( context, most_likely, result, potential_attack_string, **kwargs) result end def chained_command? command return true if CHAINED_COMMAND_CHARS.match(command) false end # Part of the Hardening for Command Injection detection is the # ability to detect and prevent any command execution from within the # application. This check determines if that hardening has been # enabled. # @return [Boolean] if the agent should report all command # executions. def report_any_command_execution? PROTECT.report_any_command_execution? end end end end end end