module Suo module Client class Base DEFAULT_OPTIONS = { acquisition_timeout: 0.1, acquisition_delay: 0.01, stale_lock_expiration: 3600, resources: 1 }.freeze attr_accessor :client, :key, :resources, :options include MonitorMixin def initialize(key, options = {}) fail "Client required" unless options[:client] @options = DEFAULT_OPTIONS.merge(options) @retry_count = (@options[:acquisition_timeout] / @options[:acquisition_delay].to_f).ceil @client = @options[:client] @resources = @options[:resources].to_i @key = key super() # initialize Monitor mixin for thread safety end def lock token = acquire_lock if block_given? && token begin yield ensure unlock(token) end else token end end def locked? locks.size >= resources end def locks val, _ = get cleared_locks = deserialize_and_clear_locks(val) cleared_locks end def refresh(token) retry_with_timeout do val, cas = get if val.nil? initial_set next end cleared_locks = deserialize_and_clear_locks(val) refresh_lock(cleared_locks, token) break if set(serialize_locks(cleared_locks), cas) end end def unlock(token) return unless token retry_with_timeout do val, cas = get break if val.nil? cleared_locks = deserialize_and_clear_locks(val) acquisition_lock = remove_lock(cleared_locks, token) break unless acquisition_lock break if set(serialize_locks(cleared_locks), cas) end rescue LockClientError => _ # rubocop:disable Lint/HandleExceptions # ignore - assume success due to optimistic locking end def clear fail NotImplementedError end private attr_accessor :retry_count def acquire_lock token = SecureRandom.base64(16) retry_with_timeout do val, cas = get if val.nil? initial_set next end cleared_locks = deserialize_and_clear_locks(val) if cleared_locks.size < resources add_lock(cleared_locks, token) newval = serialize_locks(cleared_locks) return token if set(newval, cas) end end nil end def get fail NotImplementedError end def set(newval, cas) # rubocop:disable Lint/UnusedMethodArgument fail NotImplementedError end def initial_set(val = "") # rubocop:disable Lint/UnusedMethodArgument fail NotImplementedError end def synchronize mon_synchronize { yield } end def retry_with_timeout start = Time.now.to_f retry_count.times do elapsed = Time.now.to_f - start break if elapsed >= options[:acquisition_timeout] synchronize do yield end sleep(rand(options[:acquisition_delay] * 1000).to_f / 1000) end rescue => _ raise LockClientError end def serialize_locks(locks) MessagePack.pack(locks.map { |time, token| [time.to_f, token] }) end def deserialize_and_clear_locks(val) clear_expired_locks(deserialize_locks(val)) end def deserialize_locks(val) unpacked = (val.nil? || val == "") ? [] : MessagePack.unpack(val) unpacked.map do |time, token| [Time.at(time), token] end rescue EOFError, MessagePack::MalformedFormatError => _ [] end def clear_expired_locks(locks) expired = Time.now - options[:stale_lock_expiration] locks.reject { |time, _| time < expired } end def add_lock(locks, token, time = Time.now.to_f) locks << [time, token] end def remove_lock(locks, acquisition_token) lock = locks.find { |_, token| token == acquisition_token } locks.delete(lock) end def refresh_lock(locks, acquisition_token) remove_lock(locks, acquisition_token) add_lock(locks, acquisition_token) end end end end