# Copyright (c) 2020 Contrast Security, Inc. See https://www.contrastsecurity.com/enduser-terms-0317a for more details. # frozen_string_literal: true cs__scoped_require 'contrast/agent/assess/policy/trigger/reflected_xss' cs__scoped_require 'contrast/agent/assess/policy/trigger/xpath' cs__scoped_require 'contrast/api/decorators/trace_taint_range_tags' module Contrast module Agent module Assess module Policy # This class functions to translate our policy.json into an actionable # Ruby object, allowing for dynamic patching over hardcoded patching, # specifically for those methods which result in the trigger of a # vulnerability (indicate points in the application where uncontrolled # user input can do damage). class TriggerNode < PolicyNode JSON_BAD_VALUE = 'bad_value' JSON_GOOD_VALUE = 'good_value' JSON_DISALLOWED_TAGS = 'disallowed_tags' JSON_REQUIRED_TAGS = 'required_tags' JSON_RULE_NAME = 'name' JSON_CUSTOM_PATCH = 'custom_patch' attr_reader :rule_id, :required_tags, :disallowed_tags, :good_value, :bad_value def initialize trigger_hash = {}, rule_hash = {} super(trigger_hash) good_value = trigger_hash[JSON_GOOD_VALUE] bad_value = trigger_hash[JSON_BAD_VALUE] @good_value = Regexp.new(good_value, true) if good_value @bad_value = Regexp.new(bad_value, true) if bad_value @regexp = !@dataflow && (@good_value || @bad_value) @custom_patch = trigger_hash.fetch(JSON_CUSTOM_PATCH, false) @rule_id = rule_hash.fetch(JSON_RULE_NAME) # raises KeyError exception if not found @dataflow = rule_hash.fetch(JSON_DATAFLOW, true) @required_tags = populate_tags(rule_hash[JSON_REQUIRED_TAGS]) @disallowed_tags = populate_disallowed(rule_hash[JSON_DISALLOWED_TAGS]) @trigger_class = trigger_hash['trigger_class'] @trigger_method = trigger_hash['trigger_method'] @trigger_method = @trigger_method.to_sym if @trigger_method validate end TRIGGER = 'Trigger' def node_class TRIGGER end def apply_custom_trigger context, trigger_node, source, object, ret, *args custom_trigger_class.send(@trigger_method, context, trigger_node, source, object, ret, *args) end def custom_trigger_class @_custom_trigger_class ||= Object.cs__const_get(@trigger_class) end def custom_trigger? @trigger_class && @trigger_method end def custom_patch? @custom_patch end def node_type :TYPE_METHOD end def rule_disabled? ASSESS.rule_disabled?(rule_id) end # Indicate if this is a dataflow based trigger, meaning it has a proper # synch that requires a tainted source to reach it. If this returns # false, this rule is for method validation, ensuring that an insecure # method, such as a non-cryptographically secure random, is not invoked def dataflow? @dataflow end # Indicate if this is a regexp based trigger, meaning it has a proper # synch that requires a source to reach it. While this type of rule # does not require the source to be tainted, it does validate it with # a regular expression to determine if the method is being invoked # safely. def regexp_rule? @regexp end # the name of the rule, in capital & underscore format # used to make it match enum things in TeamServer def loud_name @_loud_name ||= rule_id.upcase.gsub(Contrast::Utils::ObjectShare::DASH, Contrast::Utils::ObjectShare::UNDERSCORE) end # Determine if a dataflow rule violation has occurred def violated? source # if the source isn't tracked, there can't be a violation # this condition may not hold true forever, but for now it's # a nice optimization return false unless source.cs__tracked? # find the ranges that violate the rule (untrusted, etc) vulnerable_ranges = find_ranges_by_all_tags(Contrast::Utils::StringUtils.ret_length(source), source.cs__properties, required_tags) # if there aren't any vulnerable ranges, nope out return false if vulnerable_ranges.empty? # find the ranges that are exempt from the rule # (validated, sanitized, etc) secure_ranges = find_ranges_by_any_tag(source.cs__properties, disallowed_tags) # if there are vulnerable ranges and no secure, report return true if secure_ranges.empty? # figure out if there are any vulnerable ranges that aren't # covered by a secure one. if there are, the rule was violated !Contrast::Utils::TagUtil.covered?(vulnerable_ranges, secure_ranges) end # Standard validation + TS trace version two rules: # Must have source def validate super # If this isn't a dataflow rule, it can't have a source return unless dataflow? raise(ArgumentError, "Trigger #{ id } did not have a proper source. Unable to create.") unless sources&.any? end private # By default, any rule will be triggered if the source # of the rule event has an untrusted tag range that is # not covered by one of its disallowed tags. UNTRUSTED = 'UNTRUSTED' def populate_tags required_tags return unless dataflow? validate_rule_tags(required_tags) @required_tags = Set.new(required_tags) @required_tags << UNTRUSTED end ENCODER_START = 'CUSTOM_ENCODED_' VALIDATOR_START = 'CUSTOM_VALIDATED_' # If a level 1 rule comes from TeamServer, it will have the # tag 'custom-encoder-#{ name }' or 'custom-validator-#{ name }'. # All rules should take this into account. # Additionally, if something is marked 'limited-chars' it means # it has been properly vetted to not contain dangerous input. LIMITED_CHARS = 'LIMITED_CHARS' CUSTOM_ENCODED = 'CUSTOM_ENCODED' CUSTOM_VALIDATED = 'CUSTOM_VALIDATED' def populate_disallowed disallowed_tags return unless dataflow? validate_rule_tags(disallowed_tags) @disallowed_tags = Set.new(disallowed_tags) @disallowed_tags << LIMITED_CHARS @disallowed_tags << CUSTOM_ENCODED @disallowed_tags << CUSTOM_VALIDATED @disallowed_tags << ENCODER_START + loud_name @disallowed_tags << VALIDATOR_START + loud_name end def validate_rule_tags tags return unless tags tags.each do |tag| raise(ArgumentError, "Rule #{ rule_id } had an invalid tag. #{ tag } is not a known value.") unless Contrast::Api::Decorators::TraceTaintRangeTags::VALID_TAGS.include?(tag) || Contrast::Api::Decorators::TraceTaintRangeTags::VALID_SOURCE_TAGS.include?(tag) end end # Find the ranges that satisfy all of the given tags. # # @param length [Integer] the length of the object which may have the # given tags -- used as the maximum index to search for all of the # tags. # @param cs__properties [Contrast::Agent::Assess::Properties] the # properties to check for the tags # @param tags [Set] the list of tags on which to match # @return [Array] the ranges satisfied # by the given conditions def find_ranges_by_all_tags length, cs__properties, tags # if there aren't any all_tags or tags, break early return Contrast::Utils::ObjectShare::EMPTY_ARRAY unless cs__properties.tracked? return Contrast::Utils::ObjectShare::EMPTY_ARRAY unless tags&.any? # :zap: faster to treat all as any if there's only one tag return find_ranges_by_any_tag(cs__properties, tags) if tags.length == 1 ranges = [] # TODO: RUBY-946 clean this up, perhaps with # tags.each { |tag| applicable << cs__properties.fetch_tag(tag) } # return Contrast::Utils::ObjectShare::EMPTY_ARRAY unless applicable.length == tags.length # ... # find all the indicies on the source that have all the given tags (0..length).each do |idx| tags_at = cs__properties.tags_at(idx) ranges << idx if tags.all? do |tag| found = false tags_at.each do |tag_at| found = tag_at.label == tag break if found end found end end # break early if no indicies satisfy all the tags return Contrast::Utils::ObjectShare::EMPTY_ARRAY if ranges.empty? # chunk all the adjacent ranges chunked = ranges.chunk_while { |i, j| i + 1 == j } tag_ranges = [] # and convert them into Tags chunked.each do |join| start = join[0] stop = join[-1] # add the 1 to account for end index being exclusive tag_length = stop - start + 1 tag_ranges = Contrast::Utils::TagUtil.ordered_merge(tag_ranges, Tag.new(tag_length, start)) end tag_ranges end # Find the ranges that satisfy any of the given tags. # # @param cs__properties [Contrast::Agent::Assess::Properties] the # properties to check for the tags # @param tags [Set] the list of tags on which to match # @return [Array] the ranges satisfied # by the given conditions def find_ranges_by_any_tag cs__properties, tags # if there aren't any all_tags or tags, break early return Contrast::Utils::ObjectShare::EMPTY_ARRAY unless cs__properties.tracked? return Contrast::Utils::ObjectShare::EMPTY_ARRAY unless tags&.any? ranges = [] tags.each do |desired| found = cs__properties.fetch_tag(desired) next unless found # we need to dup here so that we don't change the tags if target is # used in another trace ranges = Contrast::Utils::TagUtil.ordered_merge(ranges, found.dup) end ranges end end end end end end