# frozen_string_literal: true
begin
gem "redis", ">= 4.0.1"
require "redis"
require "redis/distributed"
rescue LoadError
warn "The Redis cache store requires the redis gem, version 4.0.1 or later. Please add it to your Gemfile: `gem \"redis\", \">= 4.0.1\"`"
raise
end
require "connection_pool"
require "active_support/core_ext/array/wrap"
require "active_support/core_ext/hash/slice"
require "active_support/core_ext/numeric/time"
require "active_support/digest"
module ActiveSupport
module Cache
# = Redis \Cache \Store
#
# Deployment note: Take care to use a dedicated Redis cache rather
# than pointing this at a persistent Redis server (for example, one used as
# an Active Job queue). Redis won't cope well with mixed usage patterns and it
# won't expire cache entries by default.
#
# Redis cache server setup guide: https://redis.io/topics/lru-cache
#
# * Supports vanilla Redis, hiredis, and +Redis::Distributed+.
# * Supports Memcached-like sharding across Redises with +Redis::Distributed+.
# * Fault tolerant. If the Redis server is unavailable, no exceptions are
# raised. Cache fetches are all misses and writes are dropped.
# * Local cache. Hot in-memory primary cache within block/middleware scope.
# * +read_multi+ and +write_multi+ support for Redis mget/mset. Use
# +Redis::Distributed+ 4.0.1+ for distributed mget support.
# * +delete_matched+ support for Redis KEYS globs.
class RedisCacheStore < Store
# Keys are truncated with the Active Support digest if they exceed 1kB
MAX_KEY_BYTESIZE = 1024
DEFAULT_REDIS_OPTIONS = {
connect_timeout: 1,
read_timeout: 1,
write_timeout: 1,
}
DEFAULT_ERROR_HANDLER = -> (method:, returning:, exception:) do
if logger
logger.error { "RedisCacheStore: #{method} failed, returned #{returning.inspect}: #{exception.class}: #{exception.message}" }
end
ActiveSupport.error_reporter&.report(
exception,
severity: :warning,
source: "redis_cache_store.active_support",
)
end
# The maximum number of entries to receive per SCAN call.
SCAN_BATCH_SIZE = 1000
private_constant :SCAN_BATCH_SIZE
# Advertise cache versioning support.
def self.supports_cache_versioning?
true
end
prepend Strategy::LocalCache
class << self
# Factory method to create a new Redis instance.
#
# Handles four options: :redis block, :redis instance, single :url
# string, and multiple :url strings.
#
# Option Class Result
# :redis Proc -> options[:redis].call
# :redis Object -> options[:redis]
# :url String -> Redis.new(url: …)
# :url Array -> Redis::Distributed.new([{ url: … }, { url: … }, …])
#
def build_redis(redis: nil, url: nil, **redis_options) # :nodoc:
urls = Array(url)
if redis.is_a?(Proc)
redis.call
elsif redis
redis
elsif urls.size > 1
build_redis_distributed_client(urls: urls, **redis_options)
elsif urls.empty?
build_redis_client(**redis_options)
else
build_redis_client(url: urls.first, **redis_options)
end
end
private
def build_redis_distributed_client(urls:, **redis_options)
::Redis::Distributed.new([], DEFAULT_REDIS_OPTIONS.merge(redis_options)).tap do |dist|
urls.each { |u| dist.add_node url: u }
end
end
def build_redis_client(**redis_options)
::Redis.new(DEFAULT_REDIS_OPTIONS.merge(redis_options))
end
end
attr_reader :max_key_bytesize
attr_reader :redis
# Creates a new Redis cache store.
#
# There are four ways to provide the Redis client used by the cache: the
# +:redis+ param can be a Redis instance or a block that returns a Redis
# instance, or the +:url+ param can be a string or an array of strings
# which will be used to create a Redis instance or a +Redis::Distributed+
# instance.
#
# Option Class Result
# :redis Proc -> options[:redis].call
# :redis Object -> options[:redis]
# :url String -> Redis.new(url: …)
# :url Array -> Redis::Distributed.new([{ url: … }, { url: … }, …])
#
# No namespace is set by default. Provide one if the Redis cache
# server is shared with other apps: namespace: 'myapp-cache'.
#
# Compression is enabled by default with a 1kB threshold, so cached
# values larger than 1kB are automatically compressed. Disable by
# passing compress: false or change the threshold by passing
# compress_threshold: 4.kilobytes.
#
# No expiry is set on cache entries by default. Redis is expected to
# be configured with an eviction policy that automatically deletes
# least-recently or -frequently used keys when it reaches max memory.
# See https://redis.io/topics/lru-cache for cache server setup.
#
# Race condition TTL is not set by default. This can be used to avoid
# "thundering herd" cache writes when hot cache entries are expired.
# See ActiveSupport::Cache::Store#fetch for more.
#
# Setting skip_nil: true will not cache nil results:
#
# cache.fetch('foo') { nil }
# cache.fetch('bar', skip_nil: true) { nil }
# cache.exist?('foo') # => true
# cache.exist?('bar') # => false
def initialize(error_handler: DEFAULT_ERROR_HANDLER, **redis_options)
universal_options = redis_options.extract!(*UNIVERSAL_OPTIONS)
if pool_options = self.class.send(:retrieve_pool_options, redis_options)
@redis = ::ConnectionPool.new(pool_options) { self.class.build_redis(**redis_options) }
else
@redis = self.class.build_redis(**redis_options)
end
@max_key_bytesize = MAX_KEY_BYTESIZE
@error_handler = error_handler
super(universal_options)
end
def inspect
"#<#{self.class} options=#{options.inspect} redis=#{redis.inspect}>"
end
# Cache Store API implementation.
#
# Read multiple values at once. Returns a hash of requested keys ->
# fetched values.
def read_multi(*names)
return {} if names.empty?
options = names.extract_options!
instrument_multi(:read_multi, names, options) do |payload|
read_multi_entries(names, **options).tap do |results|
payload[:hits] = results.keys
end
end
end
# Cache Store API implementation.
#
# Supports Redis KEYS glob patterns:
#
# h?llo matches hello, hallo and hxllo
# h*llo matches hllo and heeeello
# h[ae]llo matches hello and hallo, but not hillo
# h[^e]llo matches hallo, hbllo, ... but not hello
# h[a-b]llo matches hallo and hbllo
#
# Use \ to escape special characters if you want to match them verbatim.
#
# See https://redis.io/commands/KEYS for more.
#
# Failsafe: Raises errors.
def delete_matched(matcher, options = nil)
instrument :delete_matched, matcher do
unless String === matcher
raise ArgumentError, "Only Redis glob strings are supported: #{matcher.inspect}"
end
redis.then do |c|
pattern = namespace_key(matcher, options)
cursor = "0"
# Fetch keys in batches using SCAN to avoid blocking the Redis server.
nodes = c.respond_to?(:nodes) ? c.nodes : [c]
nodes.each do |node|
begin
cursor, keys = node.scan(cursor, match: pattern, count: SCAN_BATCH_SIZE)
node.del(*keys) unless keys.empty?
end until cursor == "0"
end
end
end
end
# Increment a cached integer value using the Redis incrby atomic operator.
# Returns the updated value.
#
# If the key is unset or has expired, it will be set to +amount+:
#
# cache.increment("foo") # => 1
# cache.increment("bar", 100) # => 100
#
# To set a specific value, call #write passing raw: true:
#
# cache.write("baz", 5, raw: true)
# cache.increment("baz") # => 6
#
# Incrementing a non-numeric value, or a value written without
# raw: true, will fail and return +nil+.
#
# Failsafe: Raises errors.
def increment(name, amount = 1, options = nil)
instrument :increment, name, amount: amount do
failsafe :increment do
options = merged_options(options)
key = normalize_key(name, options)
change_counter(key, amount, options)
end
end
end
# Decrement a cached integer value using the Redis decrby atomic operator.
# Returns the updated value.
#
# If the key is unset or has expired, it will be set to +-amount+:
#
# cache.decrement("foo") # => -1
#
# To set a specific value, call #write passing raw: true:
#
# cache.write("baz", 5, raw: true)
# cache.decrement("baz") # => 4
#
# Decrementing a non-numeric value, or a value written without
# raw: true, will fail and return +nil+.
#
# Failsafe: Raises errors.
def decrement(name, amount = 1, options = nil)
instrument :decrement, name, amount: amount do
failsafe :decrement do
options = merged_options(options)
key = normalize_key(name, options)
change_counter(key, -amount, options)
end
end
end
# Cache Store API implementation.
#
# Removes expired entries. Handled natively by Redis least-recently-/
# least-frequently-used expiry, so manual cleanup is not supported.
def cleanup(options = nil)
super
end
# Clear the entire cache on all Redis servers. Safe to use on
# shared servers if the cache is namespaced.
#
# Failsafe: Raises errors.
def clear(options = nil)
failsafe :clear do
if namespace = merged_options(options)[:namespace]
delete_matched "*", namespace: namespace
else
redis.then { |c| c.flushdb }
end
end
end
# Get info from redis servers.
def stats
redis.then { |c| c.info }
end
private
def pipeline_entries(entries, &block)
redis.then { |c|
if c.is_a?(Redis::Distributed)
entries.group_by { |k, _v| c.node_for(k) }.each do |node, sub_entries|
node.pipelined { |pipe| yield(pipe, sub_entries) }
end
else
c.pipelined { |pipe| yield(pipe, entries) }
end
}
end
# Store provider interface:
# Read an entry from the cache.
def read_entry(key, **options)
deserialize_entry(read_serialized_entry(key, **options), **options)
end
def read_serialized_entry(key, raw: false, **options)
failsafe :read_entry do
redis.then { |c| c.get(key) }
end
end
def read_multi_entries(names, **options)
options = merged_options(options)
return {} if names == []
raw = options&.fetch(:raw, false)
keys = names.map { |name| normalize_key(name, options) }
values = failsafe(:read_multi_entries, returning: {}) do
redis.then { |c| c.mget(*keys) }
end
names.zip(values).each_with_object({}) do |(name, value), results|
if value
entry = deserialize_entry(value, raw: raw)
unless entry.nil? || entry.expired? || entry.mismatched?(normalize_version(name, options))
begin
results[name] = entry.value
rescue DeserializationError
end
end
end
end
end
# Write an entry to the cache.
#
# Requires Redis 2.6.12+ for extended SET options.
def write_entry(key, entry, raw: false, **options)
write_serialized_entry(key, serialize_entry(entry, raw: raw, **options), raw: raw, **options)
end
def write_serialized_entry(key, payload, raw: false, unless_exist: false, expires_in: nil, race_condition_ttl: nil, pipeline: nil, **options)
# If race condition TTL is in use, ensure that cache entries
# stick around a bit longer after they would have expired
# so we can purposefully serve stale entries.
if race_condition_ttl && expires_in && expires_in > 0 && !raw
expires_in += 5.minutes
end
modifiers = {}
if unless_exist || expires_in
modifiers[:nx] = unless_exist
modifiers[:px] = (1000 * expires_in.to_f).ceil if expires_in
end
if pipeline
pipeline.set(key, payload, **modifiers)
else
failsafe :write_entry, returning: false do
redis.then { |c| c.set key, payload, **modifiers }
end
end
end
# Delete an entry from the cache.
def delete_entry(key, **options)
failsafe :delete_entry, returning: false do
redis.then { |c| c.del(key) == 1 }
end
end
# Deletes multiple entries in the cache. Returns the number of entries deleted.
def delete_multi_entries(entries, **_options)
failsafe :delete_multi_entries, returning: 0 do
redis.then { |c| c.del(entries) }
end
end
# Nonstandard store provider API to write multiple values at once.
def write_multi_entries(entries, **options)
return if entries.empty?
failsafe :write_multi_entries do
pipeline_entries(entries) do |pipeline, sharded_entries|
options = options.dup
options[:pipeline] = pipeline
sharded_entries.each do |key, entry|
write_entry key, entry, **options
end
end
end
end
# Truncate keys that exceed 1kB.
def normalize_key(key, options)
truncate_key super&.b
end
def truncate_key(key)
if key && key.bytesize > max_key_bytesize
suffix = ":hash:#{ActiveSupport::Digest.hexdigest(key)}"
truncate_at = max_key_bytesize - suffix.bytesize
"#{key.byteslice(0, truncate_at)}#{suffix}"
else
key
end
end
def deserialize_entry(payload, raw: false, **)
if raw && !payload.nil?
Entry.new(payload)
else
super(payload)
end
end
def serialize_entry(entry, raw: false, **options)
if raw
entry.value.to_s
else
super(entry, raw: raw, **options)
end
end
def serialize_entries(entries, **options)
entries.transform_values do |entry|
serialize_entry(entry, **options)
end
end
def change_counter(key, amount, options)
redis.then do |c|
c = c.node_for(key) if c.is_a?(Redis::Distributed)
expires_in = options[:expires_in]
if expires_in
if supports_expire_nx?
count, _ = c.pipelined do |pipeline|
pipeline.incrby(key, amount)
pipeline.call(:expire, key, expires_in.to_i, "NX")
end
else
count, ttl = c.pipelined do |pipeline|
pipeline.incrby(key, amount)
pipeline.ttl(key)
end
c.expire(key, expires_in.to_i) if ttl < 0
end
else
count = c.incrby(key, amount)
end
count
end
end
def supports_expire_nx?
return @supports_expire_nx if defined?(@supports_expire_nx)
redis_versions = redis.then { |c| Array.wrap(c.info("server")).pluck("redis_version") }
@supports_expire_nx = redis_versions.all? { |v| Gem::Version.new(v) >= Gem::Version.new("7.0.0") }
end
def failsafe(method, returning: nil)
yield
rescue ::Redis::BaseError => error
@error_handler&.call(method: method, exception: error, returning: returning)
returning
end
end
end
end