# Copyright (c) 2020 Contrast Security, Inc. See https://www.contrastsecurity.com/enduser-terms-0317a for more details. # frozen_string_literal: true cs__scoped_require 'contrast/components/interface' cs__scoped_require 'contrast/utils/object_share' module Contrast module CoreExtensions module Protect # This Module is how we apply the XXE rule. It is called from our patches # of the targeted methods in which XML parsing and entity resolution # occurs. It is responsible for deciding if the infilter methods of the # rule should be invoked. module AppliesXxeRule include Contrast::Components::Interface access_component :logging, :analysis class << self def apply_xxe_rule _method, _exception, _properties, object, args return unless valid_input?(args) return if skip_analysis? xml = args.first parser = determine_parser(object) return unless parser rule.infilter(Contrast::Agent::REQUEST_TRACKER.current, parser, xml) rescue Contrast::SecurityException => e raise e rescue StandardError => e parser ||= Contrast::Utils::ObjectShare::UNKNOWN logger.error(e, "Error running XXE rule in #{ parser }") end # IO is tricky. If we can't rewind it, we can't fix it back to the # original state. To be safe, we'll skip non-rewindable IO objects. def apply_xxe_rule__io _method, _exception, _properties, object, args need_rewind = false return unless valid_io_input?(args) return if skip_analysis? need_rewind = true xml = args.first.read parser = determine_parser(object) return unless parser rule.infilter(Contrast::Agent::REQUEST_TRACKER.current, parser, xml) rescue Contrast::SecurityException => e raise e rescue StandardError => e parser ||= Contrast::Utils::ObjectShare::UNKNOWN logger.error(e, "Error running XXE rule in #{ parser }") ensure args.first.rewind if need_rewind end # Oga's Lexer is a special case b/c the information we need is on the # object itself -- specifically in the @data instance variable def apply_xxe_rule__lexer _method, _exception, _properties, object, _args return unless valid_data_input?(object) return if skip_analysis? parser = determine_parser(object) return unless parser data = object.instance_variable_get(DATA_KEY) if data.cs__is_a?(String) rule.infilter(Contrast::Agent::REQUEST_TRACKER.current, parser, data) elsif data.cs__respond_to?(:each_line) data.each_line do |line| rule.infilter(Contrast::Agent::REQUEST_TRACKER.current, parser, line) end data.rewind if data.cs__respond_to?(:rewind) elsif data.cs__respond_to?(:each) data.each do |chunk| rule.infilter(Contrast::Agent::REQUEST_TRACKER.current, parser, chunk) end data.rewind if data.cs__respond_to?(:rewind) end rescue Contrast::SecurityException => e raise e rescue StandardError => e parser ||= Contrast::Utils::ObjectShare::UNKNOWN logger.error(e, "Error running XXE rule in #{ parser }") end private def rule PROTECT.rule Contrast::Agent::Protect::Rule::Xxe::NAME end def valid_input? args return false unless args&.any? args.first end def valid_io_input? args return false unless valid_input?(args) io = args.first io&.respond_to?(:rewind) end DATA_KEY = '@data'.to_sym def valid_data_input? object object.instance_variable_defined?(DATA_KEY) && object.instance_variable_get(DATA_KEY) end def skip_analysis? context = Contrast::Agent::REQUEST_TRACKER.current return true unless context&.app_loaded? return true unless rule&.enabled? false end NOKOGIRI_MARKER = 'Nokogiri::' PARSER_NOKOGIRI = 'Nokogiri' OX_MARKER = 'Ox' # breaks marker pattern b/c Ox is entire classname PARSER_OX = 'Ox' OGA_MARKER = 'Oga::' PARSER_OGA = 'Oga' def determine_parser object clazz = object.is_a?(Module) ? object : object.cs__class name = clazz.cs__name if name.start_with?(NOKOGIRI_MARKER) PARSER_NOKOGIRI elsif name.start_with?(OX_MARKER) PARSER_OX elsif name.start_with?(OGA_MARKER) PARSER_OGA end end end end end end end