# Copyright (c) 2020 Contrast Security, Inc. See https://www.contrastsecurity.com/enduser-terms-0317a for more details. # frozen_string_literal: true cs__scoped_require 'contrast/utils/stack_trace_utils' cs__scoped_require 'contrast/utils/object_share' cs__scoped_require 'contrast/components/interface' module Contrast module Agent module Protect module Rule # The Ruby implementation of the Protect Command Injection rule. class CmdInjection < Contrast::Agent::Protect::Rule::BaseService include Contrast::Components::Interface access_component :logging, :contrast_service NAME = 'cmd-injection' CHAINED_COMMAND_CHARS = /[;&|<>]/.cs__freeze def name NAME end def infilter context, classname, method, command return nil unless infilter?(context) ia_results = gather_ia_results(context) return nil if ia_results.empty? if Contrast::Agent::FeatureState.instance.in_new_process? logger.debug('Running cmd-injection infilter within new process - creating new context') context = Contrast::Agent::RequestContext.new(context.request.rack_request) Contrast::Agent::REQUEST_TRACKER.update_current_context(context) end result = find_attacker_with_results(context, command, ia_results, **{ classname: classname, method: method }) result ||= report_command_execution(context, command, **{ classname: classname, method: method }) return nil unless result append_to_activity(context, result) if %I[exec `].include?(method) # TODO: RUBY-737 # Kernel#exec replaces the current process and does not go through at_exit hooks # Kernel#` runs as a subshell - messages appended here do not seem to be present in the original process? CONTRAST_SERVICE.send_message(context.activity) end return unless blocked? raise Contrast::SecurityException.new( self, "Command Injection rule triggered. Call to #{ classname }.#{ method } blocked.") end def build_attack_with_match context, input_analysis_result, result, candidate_string, **kwargs return result if mode == :NO_ACTION || mode == :PERMIT result ||= build_attack_result(context) update_successful_attack_response(context, input_analysis_result, result, candidate_string) append_sample(context, input_analysis_result, result, candidate_string, **kwargs) result end protected # Because results are not necessarily on the context across # processes; extract early and pass into the method def find_attacker_with_results context, potential_attack_string, ia_results, **kwargs logger.debug("checking: #{ name } in '#{ potential_attack_string }'") result = super(context, potential_attack_string, ia_results, **kwargs) if result.nil? && potential_attack_string result = find_probable_attacker( context, potential_attack_string, ia_results, **kwargs) end result end # Build a subclass of the RaspRuleSample using the query string and the # evaluation def build_sample context, input_analysis_result, candidate_string, **_kwargs sample = build_base_sample(context, input_analysis_result) sample.cmdi = Contrast::Api::Dtm::CmdInjectionDetails.new command = candidate_string || input_analysis_result.value command = Contrast::Utils::StringUtils.protobuf_safe_string(command) sample.cmdi.command = command # This is a special case where the user input is UNKNOWN_USER_INPUT but # we want to send the attack value if input_analysis_result.nil? ui = Contrast::Api::Dtm::UserInput.new ui.input_type = :UNKNOWN ui.value = command sample.user_input = ui end sample end private def report_command_execution context, command, **kwargs return unless report_any_command_execution? return nil if protect_excluded_by_code? build_attack_with_match(context, nil, nil, command, **kwargs) end def find_probable_attacker context, potential_attack_string, ia_results, **kwargs result = nil if chained_command?(potential_attack_string) # this is probably an attack most_likely = nil ia_results.each do |input_analysis_result| next unless chained_command?(input_analysis_result.value) most_likely = input_analysis_result break end end return result unless most_likely result ||= build_attack_with_match( context, most_likely, result, potential_attack_string, **kwargs) return nil if result.nil? log_rule_matched(context, most_likely, mode, potential_attack_string) result end def chained_command? command return true if CHAINED_COMMAND_CHARS.match(command) false end # Part of the Hardening for Command Injection detection is the # ability to detect and prevent any command execution from within the # application. This check determines if that hardening has been # enabled. # @return [Boolean] if the agent should report all command # executions. def report_any_command_execution? Contrast::Agent::FeatureState.instance.report_any_command_execution? end end end end end end