# Copyright (c) 2022 Contrast Security, Inc. See https://www.contrastsecurity.com/enduser-terms-0317a for more details. # frozen_string_literal: true require 'contrast/agent/assess/policy/trigger/reflected_xss' require 'contrast/agent/assess/policy/trigger/xpath' require 'contrast/api/decorators/trace_taint_range_tags' require 'contrast/components/logger' module Contrast module Agent module Assess module Policy # This class functions to translate our policy.json into an actionable # Ruby object, allowing for dynamic patching over hardcoded patching, # specifically for those methods which result in the trigger of a # vulnerability (indicate points in the application where uncontrolled # user input can do damage). class TriggerNode < PolicyNode # rubocop:disable Metrics/ClassLength include Contrast::Components::Logger::InstanceMethods JSON_BAD_VALUE = 'bad_value' JSON_GOOD_VALUE = 'good_value' JSON_DISALLOWED_TAGS = 'disallowed_tags' JSON_REQUIRED_TAGS = 'required_tags' JSON_RULE_NAME = 'name' JSON_CUSTOM_PATCH = 'custom_patch' # Our list with rules to be collected and reported back when we have response # from the application. Some rules rely on Content-Type validation. COLLECTABLE_RULES = %w[reflected-xss].cs__freeze attr_reader :rule_id, :required_tags, :disallowed_tags, :good_value, :bad_value ENCODER_START = 'CUSTOM_ENCODED_' # By default, any rule will be triggered if the source # of the rule event has an untrusted tag range that is # not covered by one of its disallowed tags. UNTRUSTED = 'UNTRUSTED' TRIGGER = 'Trigger' VALIDATOR_START = 'CUSTOM_VALIDATED_' # If a level 1 rule comes from TeamServer, it will have the # tag 'custom-encoder-#{ name }' or 'custom-validator-#{ name }'. # All rules should take this into account. # Additionally, if something is marked 'limited-chars' it means # it has been properly vetted to not contain dangerous input. LIMITED_CHARS = 'LIMITED_CHARS' CUSTOM_ENCODED = 'CUSTOM_ENCODED' CUSTOM_VALIDATED = 'CUSTOM_VALIDATED' def initialize trigger_hash = {}, rule_hash = {} super(trigger_hash) good_value = trigger_hash[JSON_GOOD_VALUE] bad_value = trigger_hash[JSON_BAD_VALUE] @good_value = Regexp.new(good_value, true) if good_value @bad_value = Regexp.new(bad_value, true) if bad_value @regexp = !@dataflow && (@good_value || @bad_value) @custom_patch = trigger_hash.fetch(JSON_CUSTOM_PATCH, false) @rule_id = rule_hash.fetch(JSON_RULE_NAME) # raises KeyError exception if not found @dataflow = rule_hash.fetch(JSON_DATAFLOW, true) @required_tags = populate_tags(rule_hash[JSON_REQUIRED_TAGS]) @disallowed_tags = populate_disallowed(rule_hash[JSON_DISALLOWED_TAGS]) @trigger_class = trigger_hash['trigger_class'] @trigger_method = trigger_hash['trigger_method'] @trigger_method = @trigger_method.to_sym if @trigger_method validate rescue ArgumentError => e logger.error('Trigger Node initialization failed with: ', e) nil end def node_class TRIGGER end def apply_custom_trigger trigger_node, source, object, ret, *args custom_trigger_class.send(@trigger_method, trigger_node, source, object, ret, *args) end def custom_trigger_class @_custom_trigger_class ||= Object.cs__const_get(@trigger_class) end def custom_trigger? @trigger_class && @trigger_method end def custom_patch? @custom_patch end def node_type :TYPE_METHOD end def collectable? COLLECTABLE_RULES.include?(rule_id) end def rule_disabled? ::Contrast::ASSESS.rule_disabled?(rule_id) end # Indicate if this is a dataflow based trigger, meaning it has a proper # synch that requires a tainted source to reach it. If this returns # false, this rule is for method validation, ensuring that an insecure # method, such as a non-cryptographically secure random, is not invoked def dataflow? @dataflow end # Indicate if this is a regexp based trigger, meaning it has a proper # synch that requires a source to reach it. While this type of rule # does not require the source to be tainted, it does validate it with # a regular expression to determine if the method is being invoked # safely. def regexp_rule? @regexp end # the name of the rule, in capital & underscore format # used to make it match enum things in TeamServer def loud_name @_loud_name ||= rule_id.upcase.gsub(Contrast::Utils::ObjectShare::DASH, Contrast::Utils::ObjectShare::UNDERSCORE) end # Determine if a dataflow rule violation has occurred def violated? source # if the source isn't tracked, there can't be a violation # this condition may not hold true forever, but for now it's # a nice optimization return false unless Contrast::Agent::Assess::Tracker.tracked?(source) properties = Contrast::Agent::Assess::Tracker.properties(source) # find the ranges that violate the rule (untrusted, etc) vulnerable_ranges = ranges_with_all_tags(properties, required_tags) # if there aren't any vulnerable ranges, nope out return false if vulnerable_ranges.empty? # find the ranges that are exempt from the rule # (validated, sanitized, etc) secure_ranges = ranges_with_any_tag(properties, disallowed_tags) # if there are vulnerable ranges and no secure, report return true if secure_ranges.empty? # figure out if there are any vulnerable ranges that aren't # covered by a secure one. if there are, the rule was violated !Contrast::Utils::TagUtil.covered?(vulnerable_ranges, secure_ranges) end # Standard validation + TS trace version two rules: # Must have source # @raise[ArgumentError] raises if any of the required fields is invalid or missing def validate super # If this isn't a dataflow rule, it can't have a source return unless dataflow? raise(ArgumentError, "Trigger #{ id } did not have a proper source. Unable to create.") unless sources&.any? end private def populate_tags required_tags return unless dataflow? validate_rule_tags(required_tags) @required_tags = Set.new(required_tags) @required_tags << UNTRUSTED end def populate_disallowed disallowed_tags return unless dataflow? validate_rule_tags(disallowed_tags) @disallowed_tags = Set.new(disallowed_tags) @disallowed_tags << LIMITED_CHARS @disallowed_tags << CUSTOM_ENCODED @disallowed_tags << CUSTOM_VALIDATED @disallowed_tags << (ENCODER_START + loud_name) @disallowed_tags << (VALIDATOR_START + loud_name) end # @raise[ArgumentError] raises if any of the tags is invalid def validate_rule_tags tags return unless tags tags.each do |tag| raise(ArgumentError, "Rule #{ rule_id } had an invalid tag. #{ tag } is not a known value.") unless Contrast::Api::Decorators::TraceTaintRangeTags::VALID_TAGS.include?(tag) || Contrast::Api::Decorators::TraceTaintRangeTags::VALID_SOURCE_TAGS.include?(tag) end end # Find the ranges that satisfy all of the given tags. # # @param properties [Contrast::Agent::Assess::Properties] the # properties to check for the tags # @param required_tags [Set] the list of tags on which to match # @return [Array] the ranges satisfied # by the given conditions def ranges_with_all_tags properties, required_tags return Contrast::Utils::ObjectShare::EMPTY_ARRAY unless matches_tags?(properties, required_tags) chunking = false ranges = [] # find the start and end range of required tags: search_ranges = find_required_ranges(properties, required_tags) start_range = search_ranges.first end_range = search_ranges.last + 1 # find all the indicies on the source that have all the given tags while start_range < end_range tags_at = properties.tags_at(start_range).to_a # only those that have all the required tags in the tags_at # satisfy the requirement satisfied = tags_at.any? && required_tags.all? { |tag| tags_at.any? { |found| found.label == tag } } # if this range matches all the required tags and we're already # chunking, meaning the previous range matched, do nothing # if we are satisfied and we were not chunking, this represents # the start of the next range, so create a new entry. if satisfied if chunking start_range += 1 next end ranges << Contrast::Agent::Assess::Tag.new('required', 0, start_range) chunking = true # if we are chunking and not satisfied, this represents the end # of the range, meaning the last index is what satisfied the # range. Because the range is exclusive end, we can just use this # index. elsif chunking ranges[-1]&.update_end(start_range) chunking = false end start_range += 1 end ranges end # Find the ranges that satisfy any of the given tags. # # @param properties [Contrast::Agent::Assess::Properties] the # properties to check for the tags # @param tags [Set] the list of tags on which to match # @return [Array] the ranges satisfied # by the given conditions def ranges_with_any_tag properties, tags # if there aren't any all_tags or tags, break early return Contrast::Utils::ObjectShare::EMPTY_ARRAY unless search_tags?(properties, tags) ranges = [] tags.each do |desired| found = properties.fetch_tag(desired) next unless found # we need to dup here so that we don't change the tags if target is # used in another trace ranges = Contrast::Utils::TagUtil.ordered_merge(ranges, found.dup) end ranges end # We should only try to match tags on properties if those properties have any tags (are tracked) and there # are tags to try and match on. Some rules, like regexp rules, have no tags. Some rules, like trigger, have # no properties. # # @param properties [Contrast::Agent::Assess::Properties] the properties to check for the tags # @param tags [Set] the list of tags on which to match # @return [Boolean] if the given properties has instances of every tag in tags def search_tags? properties, tags return false unless properties.tracked? return false unless tags&.any? true end # Determine if the given properties have instances of all the given tags or not. # # @param properties [Contrast::Agent::Assess::Properties] the properties to check for the tags # @param tags [Set] the list of tags on which to match # @return [Boolean] if the given properties has instances of every tag in tags def matches_tags? properties, tags return false unless search_tags?(properties, tags) return false unless tags.all? { |tag| properties.tag_keys.include?(tag) } true end # Range finder helper for #ranges_with_all_tags # # @param properties [Contrast::Agent::Assess::Properties] the properties to check for the tags # @param required_tags [Set] the list of tags on which to match # @return [Array] of required tags ranges to search def find_required_ranges properties, required_tags start_range = 0 end_range = 0 required_tags_arr = required_tags.to_a idx = 0 while idx < required_tags_arr.length # find the start and end range of required tags: start_temp = properties.fetch_tag(required_tags_arr[idx])[0].start_idx end_temp = properties.fetch_tag(required_tags_arr[idx])[0].end_idx # first iteration only start_range = start_temp if idx.zero? end_range = end_temp if idx.zero? # find the tag with smallest ranges start_range = start_temp if start_range < start_temp end_range = end_temp if end_range > end_temp idx += 1 end [start_range, end_range] end end end end end end