# Copyright (c) 2020 Contrast Security, Inc. See https://www.contrastsecurity.com/enduser-terms-0317a for more details. # frozen_string_literal: true require 'contrast/agent/assess/policy/trigger/reflected_xss' require 'contrast/agent/assess/policy/trigger/xpath' require 'contrast/api/decorators/trace_taint_range_tags' module Contrast module Agent module Assess module Policy # This class functions to translate our policy.json into an actionable # Ruby object, allowing for dynamic patching over hardcoded patching, # specifically for those methods which result in the trigger of a # vulnerability (indicate points in the application where uncontrolled # user input can do damage). class TriggerNode < PolicyNode JSON_BAD_VALUE = 'bad_value' JSON_GOOD_VALUE = 'good_value' JSON_DISALLOWED_TAGS = 'disallowed_tags' JSON_REQUIRED_TAGS = 'required_tags' JSON_RULE_NAME = 'name' JSON_CUSTOM_PATCH = 'custom_patch' attr_reader :rule_id, :required_tags, :disallowed_tags, :good_value, :bad_value def initialize trigger_hash = {}, rule_hash = {} super(trigger_hash) good_value = trigger_hash[JSON_GOOD_VALUE] bad_value = trigger_hash[JSON_BAD_VALUE] @good_value = Regexp.new(good_value, true) if good_value @bad_value = Regexp.new(bad_value, true) if bad_value @regexp = !@dataflow && (@good_value || @bad_value) @custom_patch = trigger_hash.fetch(JSON_CUSTOM_PATCH, false) @rule_id = rule_hash.fetch(JSON_RULE_NAME) # raises KeyError exception if not found @dataflow = rule_hash.fetch(JSON_DATAFLOW, true) @required_tags = populate_tags(rule_hash[JSON_REQUIRED_TAGS]) @disallowed_tags = populate_disallowed(rule_hash[JSON_DISALLOWED_TAGS]) @trigger_class = trigger_hash['trigger_class'] @trigger_method = trigger_hash['trigger_method'] @trigger_method = @trigger_method.to_sym if @trigger_method validate end TRIGGER = 'Trigger' def node_class TRIGGER end def apply_custom_trigger context, trigger_node, source, object, ret, *args custom_trigger_class.send(@trigger_method, context, trigger_node, source, object, ret, *args) end def custom_trigger_class @_custom_trigger_class ||= Object.cs__const_get(@trigger_class) end def custom_trigger? @trigger_class && @trigger_method end def custom_patch? @custom_patch end def node_type :TYPE_METHOD end def rule_disabled? ASSESS.rule_disabled?(rule_id) end # Indicate if this is a dataflow based trigger, meaning it has a proper # synch that requires a tainted source to reach it. If this returns # false, this rule is for method validation, ensuring that an insecure # method, such as a non-cryptographically secure random, is not invoked def dataflow? @dataflow end # Indicate if this is a regexp based trigger, meaning it has a proper # synch that requires a source to reach it. While this type of rule # does not require the source to be tainted, it does validate it with # a regular expression to determine if the method is being invoked # safely. def regexp_rule? @regexp end # the name of the rule, in capital & underscore format # used to make it match enum things in TeamServer def loud_name @_loud_name ||= rule_id.upcase.gsub(Contrast::Utils::ObjectShare::DASH, Contrast::Utils::ObjectShare::UNDERSCORE) end # Determine if a dataflow rule violation has occurred def violated? source # if the source isn't tracked, there can't be a violation # this condition may not hold true forever, but for now it's # a nice optimization return false unless Contrast::Agent::Assess::Tracker.tracked?(source) properties = Contrast::Agent::Assess::Tracker.properties(source) # find the ranges that violate the rule (untrusted, etc) vulnerable_ranges = ranges_with_all_tags(Contrast::Utils::StringUtils.ret_length(source), properties, required_tags) # if there aren't any vulnerable ranges, nope out return false if vulnerable_ranges.empty? # find the ranges that are exempt from the rule # (validated, sanitized, etc) secure_ranges = ranges_with_any_tag(properties, disallowed_tags) # if there are vulnerable ranges and no secure, report return true if secure_ranges.empty? # figure out if there are any vulnerable ranges that aren't # covered by a secure one. if there are, the rule was violated !Contrast::Utils::TagUtil.covered?(vulnerable_ranges, secure_ranges) end # Standard validation + TS trace version two rules: # Must have source def validate super # If this isn't a dataflow rule, it can't have a source return unless dataflow? raise(ArgumentError, "Trigger #{ id } did not have a proper source. Unable to create.") unless sources&.any? end private # By default, any rule will be triggered if the source # of the rule event has an untrusted tag range that is # not covered by one of its disallowed tags. UNTRUSTED = 'UNTRUSTED' def populate_tags required_tags return unless dataflow? validate_rule_tags(required_tags) @required_tags = Set.new(required_tags) @required_tags << UNTRUSTED end ENCODER_START = 'CUSTOM_ENCODED_' VALIDATOR_START = 'CUSTOM_VALIDATED_' # If a level 1 rule comes from TeamServer, it will have the # tag 'custom-encoder-#{ name }' or 'custom-validator-#{ name }'. # All rules should take this into account. # Additionally, if something is marked 'limited-chars' it means # it has been properly vetted to not contain dangerous input. LIMITED_CHARS = 'LIMITED_CHARS' CUSTOM_ENCODED = 'CUSTOM_ENCODED' CUSTOM_VALIDATED = 'CUSTOM_VALIDATED' def populate_disallowed disallowed_tags return unless dataflow? validate_rule_tags(disallowed_tags) @disallowed_tags = Set.new(disallowed_tags) @disallowed_tags << LIMITED_CHARS @disallowed_tags << CUSTOM_ENCODED @disallowed_tags << CUSTOM_VALIDATED @disallowed_tags << ENCODER_START + loud_name @disallowed_tags << VALIDATOR_START + loud_name end def validate_rule_tags tags return unless tags tags.each do |tag| raise(ArgumentError, "Rule #{ rule_id } had an invalid tag. #{ tag } is not a known value.") unless Contrast::Api::Decorators::TraceTaintRangeTags::VALID_TAGS.include?(tag) || Contrast::Api::Decorators::TraceTaintRangeTags::VALID_SOURCE_TAGS.include?(tag) end end # Find the ranges that satisfy all of the given tags. # # @param length [Integer] the length of the object which may have the # given tags -- used as the maximum index to search for all of the # tags. # @param properties [Contrast::Agent::Assess::Properties] the # properties to check for the tags # @param required_tags [Set] the list of tags on which to match # @return [Array] the ranges satisfied # by the given conditions def ranges_with_all_tags length, properties, required_tags return Contrast::Utils::ObjectShare::EMPTY_ARRAY unless matches_tags?(properties, required_tags) ranges = [] chunking = false # find all the indicies on the source that have all the given tags (0..length).each do |idx| tags_at = properties.tags_at(idx).to_a # only those that have all the required tags in the tags_at # satisfy the requirement satisfied = tags_at.any? && required_tags.all? { |tag| tags_at.any? { |found| found.label == tag } } # if this range matches all the required tags and we're already # chunking, meaning the previous range matched, do nothing next if satisfied && chunking # if we are satisfied and we were not chunking, this represents # the start of the next range, so create a new entry. if satisfied ranges << Contrast::Agent::Assess::Tag.new('required', 0, idx) chunking = true # if we are chunking and not satisfied, this represents the end # of the range, meaning the last index is what satisfied the # range. Because the range is exclusive end, we can just use this # index. elsif chunking ranges[-1]&.update_end(idx) chunking = false end end ranges end # Find the ranges that satisfy any of the given tags. # # @param properties [Contrast::Agent::Assess::Properties] the # properties to check for the tags # @param tags [Set] the list of tags on which to match # @return [Array] the ranges satisfied # by the given conditions def ranges_with_any_tag properties, tags # if there aren't any all_tags or tags, break early return Contrast::Utils::ObjectShare::EMPTY_ARRAY unless search_tags?(properties, tags) ranges = [] tags.each do |desired| found = properties.fetch_tag(desired) next unless found # we need to dup here so that we don't change the tags if target is # used in another trace ranges = Contrast::Utils::TagUtil.ordered_merge(ranges, found.dup) end ranges end # We should only try to match tags on properties if those properties have any tags (are tracked) and there # are tags to try and match on. Some rules, like regexp rules, have no tags. Some rules, like trigger, have # no properties. # # @param properties [Contrast::Agent::Assess::Properties] the properties to check for the tags # @param tags [Set] the list of tags on which to match # @return [Boolean] if the given properties has instances of every tag in tags def search_tags? properties, tags return false unless properties.tracked? return false unless tags&.any? true end # Determine if the given properties have instances of all the given tags or not. # # @param properties [Contrast::Agent::Assess::Properties] the properties to check for the tags # @param tags [Set] the list of tags on which to match # @return [Boolean] if the given properties has instances of every tag in tags def matches_tags? properties, tags return false unless search_tags?(properties, tags) return false unless tags.all? { |tag| properties.tag_keys.include?(tag) } true end end end end end end