lib/suo/client/base.rb in suo-0.2.3 vs lib/suo/client/base.rb in suo-0.3.0

- old
+ new

@@ -2,145 +2,150 @@ module Client class Base DEFAULT_OPTIONS = { acquisition_timeout: 0.1, acquisition_delay: 0.01, - stale_lock_expiration: 3600 + stale_lock_expiration: 3600, + resources: 1 }.freeze - attr_accessor :client + attr_accessor :client, :key, :resources, :options include MonitorMixin - def initialize(options = {}) + def initialize(key, options = {}) fail "Client required" unless options[:client] @options = DEFAULT_OPTIONS.merge(options) @retry_count = (@options[:acquisition_timeout] / @options[:acquisition_delay].to_f).ceil @client = @options[:client] - super() + @resources = @options[:resources].to_i + @key = key + super() # initialize Monitor mixin for thread safety end - def lock(key, resources = 1) - token = acquire_lock(key, resources) + def lock + token = acquire_lock if block_given? && token begin - yield(token) + yield ensure - unlock(key, token) + unlock(token) end else token end end - def locked?(key, resources = 1) - locks(key).size >= resources + def locked? + locks.size >= resources end - def locks(key) - val, _ = get(key) + def locks + val, _ = get cleared_locks = deserialize_and_clear_locks(val) cleared_locks end - def refresh(key, acquisition_token) - retry_with_timeout(key) do - val, cas = get(key) + def refresh(token) + retry_with_timeout do + val, cas = get if val.nil? - initial_set(key) + initial_set next end cleared_locks = deserialize_and_clear_locks(val) - refresh_lock(cleared_locks, acquisition_token) + refresh_lock(cleared_locks, token) - break if set(key, serialize_locks(cleared_locks), cas) + break if set(serialize_locks(cleared_locks), cas) end end - def unlock(key, acquisition_token) - return unless acquisition_token + def unlock(token) + return unless token - retry_with_timeout(key) do - val, cas = get(key) + retry_with_timeout do + val, cas = get break if val.nil? cleared_locks = deserialize_and_clear_locks(val) - acquisition_lock = remove_lock(cleared_locks, acquisition_token) + acquisition_lock = remove_lock(cleared_locks, token) break unless acquisition_lock - break if set(key, serialize_locks(cleared_locks), cas) + break if set(serialize_locks(cleared_locks), cas) end rescue LockClientError => _ # rubocop:disable Lint/HandleExceptions # ignore - assume success due to optimistic locking end - def clear(key) # rubocop:disable Lint/UnusedMethodArgument + def clear fail NotImplementedError end private - def acquire_lock(key, resources = 1) + attr_accessor :retry_count + + def acquire_lock token = SecureRandom.base64(16) - retry_with_timeout(key) do - val, cas = get(key) + retry_with_timeout do + val, cas = get if val.nil? - initial_set(key) + initial_set next end cleared_locks = deserialize_and_clear_locks(val) if cleared_locks.size < resources add_lock(cleared_locks, token) newval = serialize_locks(cleared_locks) - return token if set(key, newval, cas) + return token if set(newval, cas) end end nil end - def get(key) # rubocop:disable Lint/UnusedMethodArgument + def get fail NotImplementedError end - def set(key, newval, cas) # rubocop:disable Lint/UnusedMethodArgument + def set(newval, cas) # rubocop:disable Lint/UnusedMethodArgument fail NotImplementedError end - def initial_set(key, val = "") # rubocop:disable Lint/UnusedMethodArgument + def initial_set(val = "") # rubocop:disable Lint/UnusedMethodArgument fail NotImplementedError end - def synchronize(key) # rubocop:disable Lint/UnusedMethodArgument + def synchronize mon_synchronize { yield } end - def retry_with_timeout(key) + def retry_with_timeout start = Time.now.to_f - @retry_count.times do + retry_count.times do elapsed = Time.now.to_f - start - break if elapsed >= @options[:acquisition_timeout] + break if elapsed >= options[:acquisition_timeout] - synchronize(key) do + synchronize do yield end - sleep(rand(@options[:acquisition_delay] * 1000).to_f / 1000) + sleep(rand(options[:acquisition_delay] * 1000).to_f / 1000) end rescue => _ raise LockClientError end @@ -161,10 +166,10 @@ rescue EOFError, MessagePack::MalformedFormatError => _ [] end def clear_expired_locks(locks) - expired = Time.now - @options[:stale_lock_expiration] + expired = Time.now - options[:stale_lock_expiration] locks.reject { |time, _| time < expired } end def add_lock(locks, token, time = Time.now.to_f) locks << [time, token]