# Copyright (c) 2023 Contrast Security, Inc. See https://www.contrastsecurity.com/enduser-terms-0317a for more details. # frozen_string_literal: true require 'contrast/utils/lru_cache' require 'contrast/utils/duck_utils' require 'contrast/agent/protect/rule/input_classification/cached_result' require 'contrast/agent/protect/rule/input_classification/statistics' require 'contrast/agent/protect/rule/input_classification/utils' module Contrast module Agent module Protect module Rule module InputClassification # This Class with store the input classification results for a given user input. # Among the most used inputs are the headers values for session_id and path values. class LRUCache < Contrast::Utils::LRUCache include Contrast::Components::Logger::InstanceMethods include Contrast::Agent::Protect::Rule::InputClassification::Utils # Protect rules will always be fixed number, on other hand the number of inputs will grow, # we need to limit the number of inputs to be cached. RESULTS_CAPACITY = 20 private :[], :[]= # Initialize the LRU Cache. # # @param capacity [Integer] the maximum number of elements to store in the cache. For this # instance it will never reach outside of the number of supported Protect rules. def initialize capacity = 10 super(capacity) end def mutex @_mutex ||= Mutex.new end def with_mutex &block return_type = mutex.synchronize(&block) ensure return_type end # Capacity of the statistics will always be the number of rule_id Protect supports. # # @return [Contrast::Agent::Protect::Rule::InputClassification::Statistics] def statistics @_statistics ||= Contrast::Agent::Protect::Rule::InputClassification::Statistics.new end # Clear the cache and statistics. def clear with_mutex { @cache.clear } clear_statistics end # Clear only the statistics. def clear_statistics with_mutex { statistics.send(:clear) } end # Check if the cache is empty. # # @return [Boolean] def empty? Contrast::Utils::DuckUtils.empty_duck?(@cache) end # Check if the input is cached and returns it if so and record the statistics for the required input. # # @param rule_id [String] the Protect rule name. # @param input [String] the user input. # @param input_type [Symbol] Type of the input # @param request [Contrast::Agent::Request] the current request. # @return [Contrast::Agent::Protect::InputClassification::CachedResult, nil] def lookout rule_id, input, input_type, request with_mutex { _loockout(rule_id, input, input_type, request) } end # Save the input classification result for a given user input. # # @param rule_id [String] the Protect rule name. # @param result [Contrast::Agent::Reporting::InputAnalysisResult] # @param request [Contrast::Agent::Request] the current request. # @return result [Contrast::Agent::Protect::InputClassification::CachedResult, nil] def save rule_id, result, request with_mutex { _save(rule_id, result, request) } end private # Check if the input is cached and returns it if so and record the statistics for the required input. # # @param rule_id [String] the Protect rule name. # @param input [String] the user input. # @param input_type [Symbol] Type of the input # @param request [Contrast::Agent::Request] the current request. # @return [Contrast::Agent::Protect::InputClassification::CachedResult, nil] def _loockout rule_id, input, input_type, request cached = retrieve(rule_id, input, input_type) if cached.cs__is_a?(Contrast::Agent::Protect::InputClassification::CachedResult) # Telemetry event matched. statistics.match!(rule_id, cached, request) cached else # Telemetry event mismatched. statistics.mismatch!(rule_id, input_type) nil end rescue StandardError => e logger.error("[IA_LRU_Cache] Error while looking for #{ input_type } for #{ rule_id }", error: e) nil end # Save the input classification result for a given user input. # # @param rule_id [String] the Protect rule name. # @param result [Contrast::Agent::Reporting::InputAnalysisResult] # @param request [Contrast::Agent::Request] the current request. # @return result [Contrast::Agent::Protect::InputClassification::CachedResult, nil] def _save rule_id, result, request cached_result = Contrast::Agent::Protect::InputClassification::CachedResult.new(result, request.__id__) new_entry = safe_extract(push(rule_id, cached_result)) unless cached_result.empty? statistics.push(rule_id, cached_result) new_entry rescue StandardError => e logger.error("[IA_LRU_Cache] Error while saving #{ result } for #{ rule_id }", error: e, stack_trace: e.backtrace) end # @param rule_id [String] the Protect rule name. # @param result [Contrast::Agent::Protect::InputClassification::CachedResult, nil] # @return [Array, nil] def push rule_id, result return unless result.cs__is_a?(Contrast::Agent::Protect::InputClassification::CachedResult) return @cache[rule_id] = [result] unless key?(rule_id) cached_results = @cache[rule_id] cached_results.shift if cached_results.length >= RESULTS_CAPACITY return @cache[rule_id] << result unless already_saved?(result, rule_id) nil end # Returns the cached result for a given user input. If the input is not cached, it will return nil. # If the input is cached and key is needed, it will return the cached result and the key and key_type. # # @param rule_id [String] the Protect rule name. # @param input [String] the user input. # @param input_type [Symbol] Type of the input # @return [Contrast::Agent::Protect::InputClassification::CachedResult, nil] def retrieve rule_id, input, input_type # Check to see if cache exist if not just return the keys info. return unless key?(rule_id) safe_extract(query_fetch(rule_id, input, input_type)) end # returns true on first input already saved with the same input. # We will know if the key is needed from the result itself. # # @param cached_result [Contrast::Agent::Protect::InputClassification::CachedResult, nil] # @param rule_id [String] # @return [Boolean] def already_saved? cached_result, rule_id return false unless cached_result.cs__is_a?(Contrast::Agent::Protect::InputClassification::CachedResult) @cache[rule_id].any? do |entry| if entry.result.value == cached_result.result.value && entry.result.input_type == cached_result.result.input_type return true end end false end # Returns first match for the required input. # # @param rule_id [String] the Protect rule name. # @param input [String] the user input. # @param input_type [Symbol] Type of the input # @return result [Contrast::Agent::Protect::InputClassification::CachedResult, nil] def query_fetch rule_id, input, input_type @cache[rule_id].map do |cached_result| return cached_result if cached_result.result.value == input && cached_result.result.input_type == input_type end end end end end end end end