require 'redis' require 'securerandom' module Redlock include Scripts class Client DEFAULT_REDIS_HOST = ENV["DEFAULT_REDIS_HOST"] || "localhost" DEFAULT_REDIS_PORT = ENV["DEFAULT_REDIS_PORT"] || "6379" DEFAULT_REDIS_URLS = ["redis://#{DEFAULT_REDIS_HOST}:#{DEFAULT_REDIS_PORT}"] DEFAULT_REDIS_TIMEOUT = 0.1 DEFAULT_RETRY_COUNT = 3 DEFAULT_RETRY_DELAY = 200 DEFAULT_RETRY_JITTER = 50 CLOCK_DRIFT_FACTOR = 0.01 ## # Returns default time source function depending on CLOCK_MONOTONIC availability. # def self.default_time_source if defined?(Process::CLOCK_MONOTONIC) proc { (Process.clock_gettime(Process::CLOCK_MONOTONIC) * 1000).to_i } else proc { (Time.now.to_f * 1000).to_i } end end # Create a distributed lock manager implementing redlock algorithm. # Params: # +servers+:: The array of redis connection URLs or Redis connection instances. Or a mix of both. # +options+:: # * `retry_count` being how many times it'll try to lock a resource (default: 3) # * `retry_delay` being how many ms to sleep before try to lock again (default: 200) # * `retry_jitter` being how many ms to jitter retry delay (default: 50) # * `redis_timeout` being how the Redis timeout will be set in seconds (default: 0.1) # * `time_source` being a callable object returning a monotonic time in milliseconds # (default: see #default_time_source) def initialize(servers = DEFAULT_REDIS_URLS, options = {}) redis_timeout = options[:redis_timeout] || DEFAULT_REDIS_TIMEOUT @servers = servers.map do |server| if server.is_a?(String) RedisInstance.new(url: server, timeout: redis_timeout) else RedisInstance.new(server) end end @quorum = (servers.length / 2).to_i + 1 @retry_count = options[:retry_count] || DEFAULT_RETRY_COUNT @retry_delay = options[:retry_delay] || DEFAULT_RETRY_DELAY @retry_jitter = options[:retry_jitter] || DEFAULT_RETRY_JITTER @time_source = options[:time_source] || self.class.default_time_source end # Locks a resource for a given time. # Params: # +resource+:: the resource (or key) string to be locked. # +ttl+:: The time-to-live in ms for the lock. # +options+:: Hash of optional parameters # * +retry_count+: see +initialize+ # * +retry_delay+: see +initialize+ # * +retry_jitter+: see +initialize+ # * +extend+: A lock ("lock_info") to extend. # * +extend_only_if_locked+: Boolean, if +extend+ is given, only acquire lock if currently held # * +extend_only_if_life+: Deprecated, same as +extend_only_if_locked+ # * +extend_life+: Deprecated, same as +extend_only_if_locked+ # +block+:: an optional block to be executed; after its execution, the lock (if successfully # acquired) is automatically unlocked. def lock(resource, ttl, options = {}, &block) lock_info = try_lock_instances(resource, ttl, options) if options[:extend_only_if_life] && !Gem::Deprecate.skip warn 'DEPRECATION WARNING: The `extend_only_if_life` option has been renamed `extend_only_if_locked`.' options[:extend_only_if_locked] = options[:extend_only_if_life] end if options[:extend_life] && !Gem::Deprecate.skip warn 'DEPRECATION WARNING: The `extend_life` option has been renamed `extend_only_if_locked`.' options[:extend_only_if_locked] = options[:extend_life] end if block_given? begin yield lock_info !!lock_info ensure unlock(lock_info) if lock_info end else lock_info end end # Unlocks a resource. # Params: # +lock_info+:: the lock that has been acquired when you locked the resource. def unlock(lock_info) @servers.each { |s| s.unlock(lock_info[:resource], lock_info[:value]) } end # Locks a resource, executing the received block only after successfully acquiring the lock, # and returning its return value as a result. # See Redlock::Client#lock for parameters. def lock!(resource, *args) fail 'No block passed' unless block_given? lock(resource, *args) do |lock_info| raise LockError, resource unless lock_info return yield end end # Gets remaining ttl of a resource. The ttl is returned if the holder # currently holds the lock and it has not expired, otherwise the method # returns nil. # Params: # +lock_info+:: the lock that has been acquired when you locked the resource def get_remaining_ttl_for_lock(lock_info) ttl_info = try_get_remaining_ttl(lock_info[:resource]) return nil if ttl_info.nil? || ttl_info[:value] != lock_info[:value] ttl_info[:ttl] end # Gets remaining ttl of a resource. If there is no valid lock, the method # returns nil. # Params: # +resource+:: the name of the resource (string) for which to check the ttl def get_remaining_ttl_for_resource(resource) ttl_info = try_get_remaining_ttl(resource) return nil if ttl_info.nil? ttl_info[:ttl] end # Checks if a resource is locked # Params: # +lock_info+:: the lock that has been acquired when you locked the resource def locked?(resource) ttl = get_remaining_ttl_for_resource(resource) !(ttl.nil? || ttl.zero?) end # Checks if a lock is still valid # Params: # +lock_info+:: the lock that has been acquired when you locked the resource def valid_lock?(lock_info) ttl = get_remaining_ttl_for_lock(lock_info) !(ttl.nil? || ttl.zero?) end private class RedisInstance module ConnectionPoolLike def with yield self end end def initialize(connection) if connection.respond_to?(:with) @redis = connection else if connection.respond_to?(:client) @redis = connection else @redis = Redis.new(connection) end @redis.extend(ConnectionPoolLike) end end def lock(resource, val, ttl, allow_new_lock) recover_from_script_flush do @redis.with { |conn| conn.evalsha Scripts::LOCK_SCRIPT_SHA, keys: [resource], argv: [val, ttl, allow_new_lock] } end rescue Redis::BaseConnectionError false end def unlock(resource, val) recover_from_script_flush do @redis.with { |conn| conn.evalsha Scripts::UNLOCK_SCRIPT_SHA, keys: [resource], argv: [val] } end rescue # Nothing to do, unlocking is just a best-effort attempt. end def get_remaining_ttl(resource) recover_from_script_flush do @redis.with { |conn| conn.evalsha Scripts::PTTL_SCRIPT_SHA, keys: [resource] } end rescue Redis::BaseConnectionError nil end private def load_scripts scripts = [ Scripts::UNLOCK_SCRIPT, Scripts::LOCK_SCRIPT, Scripts::PTTL_SCRIPT ] scripts.each do |script| @redis.with { |conn| conn.script(:load, script) } end end def recover_from_script_flush retry_on_noscript = true begin yield rescue Redis::CommandError => e # When somebody has flushed the Redis instance's script cache, we might # want to reload our scripts. Only attempt this once, though, to avoid # going into an infinite loop. if retry_on_noscript && e.message.include?('NOSCRIPT') load_scripts retry_on_noscript = false retry else raise end end end end def try_lock_instances(resource, ttl, options) retry_count = options[:retry_count] || @retry_count tries = options[:extend] ? 1 : (retry_count + 1) tries.times do |attempt_number| # Wait a random delay before retrying. sleep(attempt_retry_delay(attempt_number, options)) if attempt_number > 0 lock_info = lock_instances(resource, ttl, options) return lock_info if lock_info end false end def attempt_retry_delay(attempt_number, options) retry_delay = options[:retry_delay] || @retry_delay retry_jitter = options[:retry_jitter] || @retry_jitter retry_delay = if retry_delay.respond_to?(:call) retry_delay.call(attempt_number) else retry_delay end (retry_delay + rand(retry_jitter)).to_f / 1000 end def lock_instances(resource, ttl, options) value = (options[:extend] || { value: SecureRandom.uuid })[:value] allow_new_lock = options[:extend_only_if_locked] ? 'no' : 'yes' locked, time_elapsed = timed do @servers.select { |s| s.lock resource, value, ttl, allow_new_lock }.size end validity = ttl - time_elapsed - drift(ttl) if locked >= @quorum && validity >= 0 { validity: validity, resource: resource, value: value } else @servers.each { |s| s.unlock(resource, value) } false end end def try_get_remaining_ttl(resource) # Responses from the servers are a 2 tuple of format [lock_value, ttl]. # The lock_value is nil if it does not exist. Since servers may have # different lock values, the responses are grouped by the lock_value and # transofrmed into a hash: { lock_value1 => [ttl1, ttl2, ttl3], # lock_value2 => [ttl4, tt5] } ttls_by_value, time_elapsed = timed do @servers.map { |s| s.get_remaining_ttl(resource) } .select { |ttl_tuple| ttl_tuple&.first } .group_by(&:first) .transform_values { |ttl_tuples| ttl_tuples.map { |t| t.last } } end # Authoritative lock value is that which is returned by the majority of # servers authoritative_value, ttls = ttls_by_value.max_by { |(lock_value, ttls)| ttls.length } if ttls && ttls.size >= @quorum # Return the minimum TTL of an N/2+1 selection. It will always be # correct (it will guarantee that at least N/2+1 servers have a TTL that # value or longer) min_ttl = ttls.sort.last(@quorum).first min_ttl = min_ttl - time_elapsed - drift(min_ttl) { value: authoritative_value, ttl: min_ttl } else # No lock_value is authoritatively held for the resource nil end end def drift(ttl) # Add 2 milliseconds to the drift to account for Redis expires # precision, which is 1 millisecond, plus 1 millisecond min drift # for small TTLs. (ttl * CLOCK_DRIFT_FACTOR).to_i + 2 end def timed start_time = @time_source.call() [yield, @time_source.call() - start_time] end end end