lib/suo/client/base.rb in suo-0.1.1 vs lib/suo/client/base.rb in suo-0.1.2

- old
+ new

@@ -1,16 +1,16 @@ module Suo module Client class Base DEFAULT_OPTIONS = { - retry_count: 3, + retry_timeout: 0.1, retry_delay: 0.01, stale_lock_expiration: 3600 }.freeze def initialize(options = {}) - @options = self.class.merge_defaults(options).merge(_initialized: true) + @options = self.class.merge_defaults(options) end def lock(key, resources = 1, options = {}) options = self.class.merge_defaults(@options.merge(options)) token = self.class.lock(key, resources, options) @@ -31,63 +31,154 @@ def locked?(key, resources = 1) self.class.locked?(key, resources, @options) end class << self - def lock(key, resources = 1, options = {}) # rubocop:disable Lint/UnusedMethodArgument - fail NotImplementedError + def lock(key, resources = 1, options = {}) + options = merge_defaults(options) + acquisition_token = nil + token = SecureRandom.base64(16) + + retry_with_timeout(key, options) do + val, cas = get(key, options) + + if val.nil? + set_initial(key, options) + next + end + + locks = deserialize_and_clear_locks(val, options) + + if locks.size < resources + add_lock(locks, token) + + newval = serialize_locks(locks) + + if set(key, newval, cas, options) + acquisition_token = token + break + end + end + end + + acquisition_token end def locked?(key, resources = 1, options = {}) - options = merge_defaults(options) - client = options[:client] - locks = deserialize_locks(client.get(key)) - - locks.size >= resources + locks(key, options).size >= resources end def locks(key, options) options = merge_defaults(options) - client = options[:client] - locks = deserialize_locks(client.get(key)) + val, _ = get(key, options) + locks = deserialize_locks(val) - locks.size + locks end - def refresh(key, acquisition_token, options = {}) # rubocop:disable Lint/UnusedMethodArgument - fail NotImplementedError + def refresh(key, acquisition_token, options = {}) + options = merge_defaults(options) + + retry_with_timeout(key, options) do + val, cas = get(key, options) + + if val.nil? + set_initial(key, options) + next + end + + locks = deserialize_and_clear_locks(val, options) + + refresh_lock(locks, acquisition_token) + + break if set(key, serialize_locks(locks), cas, options) + end end - def unlock(key, acquisition_token, options = {}) # rubocop:disable Lint/UnusedMethodArgument - fail NotImplementedError + def unlock(key, acquisition_token, options = {}) + options = merge_defaults(options) + + return unless acquisition_token + + retry_with_timeout(key, options) do + val, cas = get(key, options) + + break if val.nil? + + locks = deserialize_and_clear_locks(val, options) + + acquisition_lock = remove_lock(locks, acquisition_token) + + break unless acquisition_lock + break if set(key, serialize_locks(locks), cas, options) + end + rescue FailedToAcquireLock => _ # rubocop:disable Lint/HandleExceptions + # ignore - assume success due to optimistic locking end def clear(key, options = {}) # rubocop:disable Lint/UnusedMethodArgument fail NotImplementedError end def merge_defaults(options = {}) - unless options[:_initialized] - options = self::DEFAULT_OPTIONS.merge(options) + options = self::DEFAULT_OPTIONS.merge(options) - fail "Client required" unless options[:client] - end + fail "Client required" unless options[:client] - if options[:retry_timeout] - options[:retry_count] = (options[:retry_timeout] / options[:retry_delay].to_f).floor - end + options[:retry_count] = (options[:retry_timeout] / options[:retry_delay].to_f).ceil options end private + def get(key, options) # rubocop:disable Lint/UnusedMethodArgument + fail NotImplementedError + end + + def set(key, newval, oldval, options) # rubocop:disable Lint/UnusedMethodArgument + fail NotImplementedError + end + + def set_initial(key, options) # rubocop:disable Lint/UnusedMethodArgument + fail NotImplementedError + end + + def synchronize(key, options) + yield(key, options) + end + + def retry_with_timeout(key, options) + start = Time.now.to_f + + options[:retry_count].times do + if options[:retry_timeout] + now = Time.now.to_f + break if now - start > options[:retry_timeout] + end + + synchronize(key, options) do + yield + end + + sleep(rand(options[:retry_delay] * 1000).to_f / 1000) + end + rescue => _ + raise FailedToAcquireLock + end + def serialize_locks(locks) MessagePack.pack(locks.map { |time, token| [time.to_f, token] }) end + def deserialize_and_clear_locks(val, options) + clear_expired_locks(deserialize_locks(val), options) + end + def deserialize_locks(val) - MessagePack.unpack(val).map do |time, token| + unpacked = (val.nil? || val == "") ? [] : MessagePack.unpack(val) + + unpacked.map do |time, token| [Time.at(time), token] end rescue EOFError => _ [] end