lib/active_support/cache.rb in activesupport-7.0.8.6 vs lib/active_support/cache.rb in activesupport-7.1.0.beta1

- old
+ new

@@ -1,17 +1,18 @@ # frozen_string_literal: true require "zlib" require "active_support/core_ext/array/extract_options" -require "active_support/core_ext/array/wrap" require "active_support/core_ext/enumerable" require "active_support/core_ext/module/attribute_accessors" require "active_support/core_ext/numeric/bytes" -require "active_support/core_ext/numeric/time" require "active_support/core_ext/object/to_param" require "active_support/core_ext/object/try" require "active_support/core_ext/string/inflections" +require_relative "cache/coder" +require_relative "cache/entry" +require_relative "cache/serializer_with_fallback" module ActiveSupport # See ActiveSupport::Cache::Store for documentation. module Cache autoload :FileStore, "active_support/cache/file_store" @@ -20,19 +21,35 @@ autoload :NullStore, "active_support/cache/null_store" autoload :RedisCacheStore, "active_support/cache/redis_cache_store" # These options mean something to all cache implementations. Individual cache # implementations may support additional options. - UNIVERSAL_OPTIONS = [:namespace, :compress, :compress_threshold, :expires_in, :expire_in, :expired_in, :race_condition_ttl, :coder, :skip_nil] + UNIVERSAL_OPTIONS = [ + :coder, + :compress, + :compress_threshold, + :compressor, + :expire_in, + :expired_in, + :expires_in, + :namespace, + :race_condition_ttl, + :serializer, + :skip_nil, + ] - DEFAULT_COMPRESS_LIMIT = 1.kilobyte - # Mapping of canonical option names to aliases that a store will recognize. OPTION_ALIASES = { expires_in: [:expire_in, :expired_in] }.freeze + DEFAULT_COMPRESS_LIMIT = 1.kilobyte + + # Raised by coders when the cache entry can't be deserialized. + # This error is treated as a cache miss. + DeserializationError = Class.new(StandardError) + module Strategy autoload :LocalCache, "active_support/cache/strategy/local_cache" end @format_version = 6.1 @@ -130,10 +147,12 @@ else ActiveSupport::Cache.const_get(store.to_s.camelize) end end + # = Active Support \Cache \Store + # # An abstract cache store class. There are multiple cache store # implementations, each having its own additional features. See the classes # under the ActiveSupport::Cache module, e.g. # ActiveSupport::Cache::MemCacheStore. MemCacheStore is currently the most # popular cache store for large production websites. @@ -172,50 +191,150 @@ # cache.namespace = -> { @last_mod_time } # Set the namespace to a variable # @last_mod_time = Time.now # Invalidate the entire cache by changing namespace # class Store cattr_accessor :logger, instance_writer: true + cattr_accessor :raise_on_invalid_cache_expiration_time, default: false attr_reader :silence, :options alias :silence? :silence class << self private + DEFAULT_POOL_OPTIONS = { size: 5, timeout: 5 }.freeze + private_constant :DEFAULT_POOL_OPTIONS + def retrieve_pool_options(options) - {}.tap do |pool_options| - pool_options[:size] = options.delete(:pool_size) if options[:pool_size] - pool_options[:timeout] = options.delete(:pool_timeout) if options[:pool_timeout] + if options.key?(:pool) + pool_options = options.delete(:pool) + elsif options.key?(:pool_size) || options.key?(:pool_timeout) + pool_options = {} + + if options.key?(:pool_size) + ActiveSupport.deprecator.warn(<<~MSG) + Using :pool_size is deprecated and will be removed in Rails 7.2. + Use `pool: { size: #{options[:pool_size].inspect} }` instead. + MSG + pool_options[:size] = options.delete(:pool_size) + end + + if options.key?(:pool_timeout) + ActiveSupport.deprecator.warn(<<~MSG) + Using :pool_timeout is deprecated and will be removed in Rails 7.2. + Use `pool: { timeout: #{options[:pool_timeout].inspect} }` instead. + MSG + pool_options[:timeout] = options.delete(:pool_timeout) + end + else + pool_options = true end - end - def ensure_connection_pool_added! - require "connection_pool" - rescue LoadError => e - $stderr.puts "You don't have connection_pool installed in your application. Please add it to your Gemfile and run bundle install" - raise e + case pool_options + when false, nil + return false + when true + pool_options = DEFAULT_POOL_OPTIONS + when Hash + pool_options[:size] = Integer(pool_options[:size]) if pool_options.key?(:size) + pool_options[:timeout] = Float(pool_options[:timeout]) if pool_options.key?(:timeout) + pool_options = DEFAULT_POOL_OPTIONS.merge(pool_options) + else + raise TypeError, "Invalid :pool argument, expected Hash, got: #{pool_options.inspect}" + end + + pool_options unless pool_options.empty? end end # Creates a new cache. # # ==== Options # - # * +:namespace+ - Sets the namespace for the cache. This option is - # especially useful if your application shares a cache with other - # applications. - # * +:coder+ - Replaces the default cache entry serialization mechanism - # with a custom one. The +coder+ must respond to +dump+ and +load+. - # Using a custom coder disables automatic compression. + # [+:namespace+] + # Sets the namespace for the cache. This option is especially useful if + # your application shares a cache with other applications. # + # [+:serializer+] + # The serializer for cached values. Must respond to +dump+ and +load+. + # + # The default serializer depends on the cache format version (set via + # +config.active_support.cache_format_version+ when using Rails). The + # default serializer for each format version includes a fallback + # mechanism to deserialize values from any format version. This behavior + # makes it easy to migrate between format versions without invalidating + # the entire cache. + # + # You can also specify <tt>serializer: :message_pack</tt> to use a + # preconfigured serializer based on ActiveSupport::MessagePack. The + # +:message_pack+ serializer includes the same deserialization fallback + # mechanism, allowing easy migration from (or to) the default + # serializer. The +:message_pack+ serializer may improve performance, + # but it requires the +msgpack+ gem. + # + # [+:compressor+] + # The compressor for serialized cache values. Must respond to +deflate+ + # and +inflate+. + # + # The default compressor is +Zlib+. To define a new custom compressor + # that also decompresses old cache entries, you can check compressed + # values for Zlib's <tt>"\x78"</tt> signature: + # + # module MyCompressor + # def self.deflate(dumped) + # # compression logic... (make sure result does not start with "\x78"!) + # end + # + # def self.inflate(compressed) + # if compressed.start_with?("\x78") + # Zlib.inflate(compressed) + # else + # # decompression logic... + # end + # end + # end + # + # ActiveSupport::Cache.lookup_store(:redis_cache_store, compressor: MyCompressor) + # + # [+:coder+] + # The coder for serializing and (optionally) compressing cache entries. + # Must respond to +dump+ and +load+. + # + # The default coder composes the serializer and compressor, and includes + # some performance optimizations. If you only need to override the + # serializer or compressor, you should specify the +:serializer+ or + # +:compressor+ options instead. + # + # If the store can handle cache entries directly, you may also specify + # <tt>coder: nil</tt> to omit the serializer, compressor, and coder. For + # example, if you are using ActiveSupport::Cache::MemoryStore and can + # guarantee that cache values will not be mutated, you can specify + # <tt>coder: nil</tt> to avoid the overhead of safeguarding against + # mutation. + # + # The +:coder+ option is mutally exclusive with the +:serializer+ and + # +:compressor+ options. Specifying them together will raise an + # +ArgumentError+. + # # Any other specified options are treated as default options for the # relevant cache operations, such as #read, #write, and #fetch. def initialize(options = nil) - @options = options ? normalize_options(options) : {} + @options = options ? validate_options(normalize_options(options)) : {} + @options[:compress] = true unless @options.key?(:compress) - @options[:compress_threshold] = DEFAULT_COMPRESS_LIMIT unless @options.key?(:compress_threshold) + @options[:compress_threshold] ||= DEFAULT_COMPRESS_LIMIT - @coder = @options.delete(:coder) { default_coder } || NullCoder + @coder = @options.delete(:coder) do + legacy_serializer = Cache.format_version < 7.1 && !@options[:serializer] + serializer = @options.delete(:serializer) || default_serializer + serializer = Cache::SerializerWithFallback[serializer] if serializer.is_a?(Symbol) + compressor = @options.delete(:compressor) { Zlib } + + Cache::Coder.new(serializer, compressor, legacy_serializer: legacy_serializer) + end + + @coder ||= Cache::SerializerWithFallback[:passthrough] + @coder_supports_compression = @coder.respond_to?(:dump_compressed) end # Silences the logger. def silence! @@ -316,22 +435,42 @@ # sleep 10 # First thread extended the life of cache by another 10 seconds # cache.fetch('foo') # => "new value 1" # val_1 # => "new value 1" # val_2 # => "original value" # + # ==== Dynamic Options + # + # In some cases it may be necessary to to dynamically compute options based + # on the cached value. For this purpose, a ActiveSupport::Cache::WriteOptions + # instance is passed as a second argument to the block + # + # cache.fetch("authentication-token:#{user.id}") do |key, options| + # token = authenticate_to_service + # options.expires_at = token.expires_at + # token + # end + # + # Only some options can be set dynamically: + # + # - +:expires_in+ + # - +:expires_at+ + # - +:version+ + # def fetch(name, options = nil, &block) if block_given? options = merged_options(options) key = normalize_key(name, options) entry = nil - instrument(:read, name, options) do |payload| - cached_entry = read_entry(key, **options, event: payload) unless options[:force] - entry = handle_expired_entry(cached_entry, key, options) - entry = nil if entry && entry.mismatched?(normalize_version(name, options)) - payload[:super_operation] = :fetch if payload - payload[:hit] = !!entry if payload + unless options[:force] + instrument(:read, name, options) do |payload| + cached_entry = read_entry(key, **options, event: payload) + entry = handle_expired_entry(cached_entry, key, options) + entry = nil if entry && entry.mismatched?(normalize_version(name, options)) + payload[:super_operation] = :fetch if payload + payload[:hit] = !!entry if payload + end end if entry get_entry_value(entry, name, options) else @@ -352,10 +491,11 @@ # <tt>:version</tt> options, both of these conditions are applied before # the data is returned. # # ==== Options # + # * +:namespace+ - Replace the store namespace for this call. # * +:version+ - Specifies a version for the cache entry. If the cached # version does not match the requested version, the read will be treated # as a cache miss. This feature is used to support recyclable cache keys. # # Other options will be handled by the specific cache store implementation. @@ -391,25 +531,29 @@ # # Some cache implementation may optimize this method. # # Returns a hash mapping the names provided to the values found. def read_multi(*names) + return {} if names.empty? + options = names.extract_options! options = merged_options(options) - instrument :read_multi, names, options do |payload| + instrument_multi :read_multi, names, options do |payload| read_multi_entries(names, **options, event: payload).tap do |results| payload[:hits] = results.keys end end end # Cache Storage API to write multiple values at once. def write_multi(hash, options = nil) + return hash if hash.empty? + options = merged_options(options) - instrument :write_multi, hash, options do |payload| + instrument_multi :write_multi, hash, options do |payload| entries = hash.each_with_object({}) do |(name, value), memo| memo[normalize_key(name, options)] = Entry.new(value, **options.merge(version: normalize_version(name, options))) end write_multi_entries entries, **options @@ -431,11 +575,12 @@ # "Fallback value for key: #{key}" # end # # => { "bim" => "bam", # # "unknown_key" => "Fallback value for key: unknown_key" } # - # Options are passed to the underlying cache implementation. For example: + # You may also specify additional options via the +options+ argument. See #fetch for details. + # Other options are passed to the underlying cache implementation. For example: # # cache.fetch_multi("fizz", expires_in: 5.seconds) do |key| # "buzz" # end # # => {"fizz"=>"buzz"} @@ -444,20 +589,27 @@ # sleep(6) # cache.read("fizz") # # => nil def fetch_multi(*names) raise ArgumentError, "Missing block: `Cache#fetch_multi` requires a block." unless block_given? + return {} if names.empty? options = names.extract_options! options = merged_options(options) - instrument :read_multi, names, options do |payload| - reads = read_multi_entries(names, **options) + instrument_multi :read_multi, names, options do |payload| + if options[:force] + reads = {} + else + reads = read_multi_entries(names, **options) + end + writes = {} ordered = names.index_with do |name| reads.fetch(name) { writes[name] = yield(name) } end + writes.compact! if options[:skip_nil] payload[:hits] = reads.keys payload[:super_operation] = :fetch_multi write_multi(writes, options) @@ -506,29 +658,33 @@ entry = Entry.new(value, **options.merge(version: normalize_version(name, options))) write_entry(normalize_key(name, options), entry, **options) end end - # Deletes an entry in the cache. Returns +true+ if an entry is deleted. + # Deletes an entry in the cache. Returns +true+ if an entry is deleted + # and +false+ otherwise. # # Options are passed to the underlying cache implementation. def delete(name, options = nil) options = merged_options(options) instrument(:delete, name) do delete_entry(normalize_key(name, options), **options) end end - # Deletes multiple entries in the cache. + # Deletes multiple entries in the cache. Returns the number of deleted + # entries. # # Options are passed to the underlying cache implementation. def delete_multi(names, options = nil) + return 0 if names.empty? + options = merged_options(options) names.map! { |key| normalize_key(key, options) } - instrument :delete_multi, names do + instrument_multi :delete_multi, names do delete_multi_entries(names, **options) end end # Returns +true+ if the cache contains an entry for the given key. @@ -592,12 +748,27 @@ def clear(options = nil) raise NotImplementedError.new("#{self.class.name} does not support clear") end private - def default_coder - Coders[Cache.format_version] + def default_serializer + case Cache.format_version + when 6.1 + ActiveSupport.deprecator.warn <<~EOM + Support for `config.active_support.cache_format_version = 6.1` has been deprecated and will be removed in Rails 7.2. + + Check the Rails upgrade guide at https://guides.rubyonrails.org/upgrading_ruby_on_rails.html#new-activesupport-cache-serialization-format + for more information on how to upgrade. + EOM + Cache::SerializerWithFallback[:marshal_6_1] + when 7.0 + Cache::SerializerWithFallback[:marshal_7_0] + when 7.1 + Cache::SerializerWithFallback[:marshal_7_1] + else + raise ArgumentError, "Unrecognized ActiveSupport::Cache.format_version: #{Cache.format_version.inspect}" + end end # Adds the namespace defined in the options to a pattern designed to # match keys. Implementations that support delete_matched should call # this method to translate a pattern that matches names into one that @@ -630,18 +801,20 @@ end def serialize_entry(entry, **options) options = merged_options(options) if @coder_supports_compression && options[:compress] - @coder.dump_compressed(entry, options[:compress_threshold] || DEFAULT_COMPRESS_LIMIT) + @coder.dump_compressed(entry, options[:compress_threshold]) else @coder.dump(entry) end end def deserialize_entry(payload) payload.nil? ? nil : @coder.load(payload) + rescue DeserializationError + nil end # Reads multiple entries from the cache implementation. Subclasses MAY # implement this method. def read_multi_entries(names, **options) @@ -683,20 +856,46 @@ # Merges the default options with ones specific to a method call. def merged_options(call_options) if call_options call_options = normalize_options(call_options) + if call_options.key?(:expires_in) && call_options.key?(:expires_at) + raise ArgumentError, "Either :expires_in or :expires_at can be supplied, but not both" + end + + expires_at = call_options.delete(:expires_at) + call_options[:expires_in] = (expires_at - Time.now) if expires_at + + if call_options[:expires_in].is_a?(Time) + expires_in = call_options[:expires_in] + raise ArgumentError.new("expires_in parameter should not be a Time. Did you mean to use expires_at? Got: #{expires_in}") + end + if call_options[:expires_in]&.negative? + expires_in = call_options.delete(:expires_in) + handle_invalid_expires_in("Cache expiration time is invalid, cannot be negative: #{expires_in}") + end + if options.empty? call_options else options.merge(call_options) end else options end end + def handle_invalid_expires_in(message) + error = ArgumentError.new(message) + if ActiveSupport::Cache::Store.raise_on_invalid_cache_expiration_time + raise error + else + ActiveSupport.error_reporter&.report(error, handled: true, severity: :warning) + logger.error("#{error.class}: #{error.message}") if logger + end + end + # Normalize aliased options to their canonical form def normalize_options(options) options = options.dup OPTION_ALIASES.each do |canonical_name, aliases| alias_key = aliases.detect { |key| options.key?(key) } @@ -705,14 +904,35 @@ end options end - # Expands and namespaces the cache key. May be overridden by - # cache stores to do additional normalization. + def validate_options(options) + if options.key?(:coder) && options[:serializer] + raise ArgumentError, "Cannot specify :serializer and :coder options together" + end + + if options.key?(:coder) && options[:compressor] + raise ArgumentError, "Cannot specify :compressor and :coder options together" + end + + if Cache.format_version < 7.1 && !options[:serializer] && options[:compressor] + raise ArgumentError, "Cannot specify :compressor option when using" \ + " default serializer and cache format version is < 7.1" + end + + options + end + + # Expands and namespaces the cache key. + # Raises an exception when the key is +nil+ or an empty string. + # May be overridden by cache stores to do additional normalization. def normalize_key(key, options = nil) - namespace_key expanded_key(key), options + str_key = expanded_key(key) + raise(ArgumentError, "key cannot be blank") if !str_key || str_key.empty? + + namespace_key str_key, options end # Prefix the key with a namespace string: # # namespace_key 'foo', namespace: 'cache' @@ -771,18 +991,37 @@ when key.is_a?(Array) then key.map { |element| expanded_version(element) }.tap(&:compact!).to_param when key.respond_to?(:to_a) then expanded_version(key.to_a) end end - def instrument(operation, key, options = nil) + def instrument(operation, key, options = nil, &block) + _instrument(operation, key: key, options: options, &block) + end + + def instrument_multi(operation, keys, options = nil, &block) + _instrument(operation, multi: true, key: keys, options: options, &block) + end + + def _instrument(operation, multi: false, options: nil, **payload, &block) if logger && logger.debug? && !silence? - logger.debug "Cache #{operation}: #{normalize_key(key, options)}#{options.blank? ? "" : " (#{options.inspect})"}" + debug_key = + if multi + ": #{payload[:key].size} key(s) specified" + elsif payload[:key] + ": #{normalize_key(payload[:key], options)}" + end + + debug_options = " (#{options.inspect})" unless options.blank? + + logger.debug "Cache #{operation}#{debug_key}#{debug_options}" end - payload = { key: key, store: self.class.name } + payload[:store] = self.class.name payload.merge!(options) if options.is_a?(Hash) - ActiveSupport::Notifications.instrument("cache_#{operation}.active_support", payload) { yield(payload) } + ActiveSupport::Notifications.instrument("cache_#{operation}.active_support", payload) do + block&.call(payload) + end end def handle_expired_entry(entry, key, options) if entry && entry.expired? race_ttl = options[:race_condition_ttl].to_i @@ -798,233 +1037,52 @@ end entry end def get_entry_value(entry, name, options) - instrument(:fetch_hit, name, options) { } + instrument(:fetch_hit, name, options) entry.value end def save_block_result_to_cache(name, options) result = instrument(:generate, name, options) do - yield(name) + yield(name, WriteOptions.new(options)) end write(name, result, options) unless result.nil? && options[:skip_nil] result end end - module NullCoder # :nodoc: - extend self - - def dump(entry) - entry + class WriteOptions + def initialize(options) # :nodoc: + @options = options end - def dump_compressed(entry, threshold) - entry.compressed(threshold) + def version + @options[:version] end - def load(payload) - payload + def version=(version) + @options[:version] = version end - end - module Coders # :nodoc: - MARK_61 = "\x04\b".b.freeze # The one set by Marshal. - MARK_70_UNCOMPRESSED = "\x00".b.freeze - MARK_70_COMPRESSED = "\x01".b.freeze - - class << self - def [](version) - case version - when 6.1 - Rails61Coder - when 7.0 - Rails70Coder - else - raise ArgumentError, "Unknown ActiveSupport::Cache.format_version: #{Cache.format_version.inspect}" - end - end + def expires_in + @options[:expires_in] end - module Loader - extend self - - def load(payload) - if !payload.is_a?(String) - ActiveSupport::Cache::Store.logger&.warn %{Payload wasn't a string, was #{payload.class.name} - couldn't unmarshal, so returning nil."} - - return nil - elsif payload.start_with?(MARK_70_UNCOMPRESSED) - members = Marshal.load(payload.byteslice(1..-1)) - elsif payload.start_with?(MARK_70_COMPRESSED) - members = Marshal.load(Zlib::Inflate.inflate(payload.byteslice(1..-1))) - elsif payload.start_with?(MARK_61) - return Marshal.load(payload) - else - ActiveSupport::Cache::Store.logger&.warn %{Invalid cache prefix: #{payload.byteslice(0).inspect}, expected "\\x00" or "\\x01"} - - return nil - end - Entry.unpack(members) - end + def expires_in=(expires_in) + @options.delete(:expires_at) + @options[:expires_in] = expires_in end - module Rails61Coder - include Loader - extend self - - def dump(entry) - Marshal.dump(entry) - end - - def dump_compressed(entry, threshold) - Marshal.dump(entry.compressed(threshold)) - end - end - - module Rails70Coder - include Loader - extend self - - def dump(entry) - MARK_70_UNCOMPRESSED + Marshal.dump(entry.pack) - end - - def dump_compressed(entry, threshold) - payload = Marshal.dump(entry.pack) - if payload.bytesize >= threshold - compressed_payload = Zlib::Deflate.deflate(payload) - if compressed_payload.bytesize < payload.bytesize - return MARK_70_COMPRESSED + compressed_payload - end - end - - MARK_70_UNCOMPRESSED + payload - end - end - end - - # This class is used to represent cache entries. Cache entries have a value, an optional - # expiration time, and an optional version. The expiration time is used to support the :race_condition_ttl option - # on the cache. The version is used to support the :version option on the cache for rejecting - # mismatches. - # - # Since cache entries in most instances will be serialized, the internals of this class are highly optimized - # using short instance variable names that are lazily defined. - class Entry # :nodoc: - class << self - def unpack(members) - new(members[0], expires_at: members[1], version: members[2]) - end - end - - attr_reader :version - - # Creates a new cache entry for the specified value. Options supported are - # +:compressed+, +:version+, +:expires_at+ and +:expires_in+. - def initialize(value, compressed: false, version: nil, expires_in: nil, expires_at: nil, **) - @value = value - @version = version - @created_at = 0.0 - @expires_in = expires_at&.to_f || expires_in && (expires_in.to_f + Time.now.to_f) - @compressed = true if compressed - end - - def value - compressed? ? uncompress(@value) : @value - end - - def mismatched?(version) - @version && version && @version != version - end - - # Checks if the entry is expired. The +expires_in+ parameter can override - # the value set when the entry was created. - def expired? - @expires_in && @created_at + @expires_in <= Time.now.to_f - end - def expires_at - @expires_in ? @created_at + @expires_in : nil + @options[:expires_at] end - def expires_at=(value) - if value - @expires_in = value.to_f - @created_at - else - @expires_in = nil - end + def expires_at=(expires_at) + @options.delete(:expires_in) + @options[:expires_at] = expires_at end - - # Returns the size of the cached value. This could be less than - # <tt>value.bytesize</tt> if the data is compressed. - def bytesize - case value - when NilClass - 0 - when String - @value.bytesize - else - @s ||= Marshal.dump(@value).bytesize - end - end - - def compressed? # :nodoc: - defined?(@compressed) - end - - def compressed(compress_threshold) - return self if compressed? - - case @value - when nil, true, false, Numeric - uncompressed_size = 0 - when String - uncompressed_size = @value.bytesize - else - serialized = Marshal.dump(@value) - uncompressed_size = serialized.bytesize - end - - if uncompressed_size >= compress_threshold - serialized ||= Marshal.dump(@value) - compressed = Zlib::Deflate.deflate(serialized) - - if compressed.bytesize < uncompressed_size - return Entry.new(compressed, compressed: true, expires_at: expires_at, version: version) - end - end - self - end - - def local? - false - end - - # Duplicates the value in a class. This is used by cache implementations that don't natively - # serialize entries to protect against accidental cache modifications. - def dup_value! - if @value && !compressed? && !(@value.is_a?(Numeric) || @value == true || @value == false) - if @value.is_a?(String) - @value = @value.dup - else - @value = Marshal.load(Marshal.dump(@value)) - end - end - end - - def pack - members = [value, expires_at, version] - members.pop while !members.empty? && members.last.nil? - members - end - - private - def uncompress(value) - Marshal.load(Zlib::Inflate.inflate(value)) - end end end end