lib/suo/client/base.rb in suo-0.1.3 vs lib/suo/client/base.rb in suo-0.2.0

- old
+ new

@@ -1,207 +1,189 @@ module Suo module Client class Base - DEFAULT_OPTIONS = { - retry_timeout: 0.1, - retry_delay: 0.01, + acquisition_timeout: 0.1, + acquisition_delay: 0.01, stale_lock_expiration: 3600 }.freeze + attr_accessor :client + + include MonitorMixin + def initialize(options = {}) - @options = self.class.merge_defaults(options) + fail "Client required" unless options[:client] + @options = DEFAULT_OPTIONS.merge(options) + @retry_count = (@options[:acquisition_timeout] / @options[:acquisition_delay].to_f).ceil + @client = @options[:client] + super() end - def lock(key, resources = 1, options = {}) - options = self.class.merge_defaults(@options.merge(options)) - token = self.class.lock(key, resources, options) + def lock(key, resources = 1) + token = acquire_lock(key, resources) - if token + if block_given? && token begin - yield if block_given? + yield ensure - self.class.unlock(key, token, options) + unlock(key, token) end - - true else - false + token end end def locked?(key, resources = 1) - self.class.locked?(key, resources, @options) + locks(key).size >= resources end - class << self - def lock(key, resources = 1, options = {}) - options = merge_defaults(options) - acquisition_token = nil - token = SecureRandom.base64(16) + def locks(key) + val, _ = get(key) + locks = deserialize_locks(val) - retry_with_timeout(key, options) do - val, cas = get(key, options) + locks + end - if val.nil? - set_initial(key, options) - next - end + def refresh(key, acquisition_token) + retry_with_timeout(key) do + val, cas = get(key) - locks = deserialize_and_clear_locks(val, options) - - if locks.size < resources - add_lock(locks, token) - - newval = serialize_locks(locks) - - if set(key, newval, cas, options) - acquisition_token = token - break - end - end + if val.nil? + set_initial(key) + next end - acquisition_token - end + locks = deserialize_and_clear_locks(val) - def locked?(key, resources = 1, options = {}) - locks(key, options).size >= resources - end + refresh_lock(locks, acquisition_token) - def locks(key, options) - options = merge_defaults(options) - val, _ = get(key, options) - locks = deserialize_locks(val) - - locks + break if set(key, serialize_locks(locks), cas) end + end - def refresh(key, acquisition_token, options = {}) - options = merge_defaults(options) + def unlock(key, acquisition_token) + return unless acquisition_token - retry_with_timeout(key, options) do - val, cas = get(key, options) + retry_with_timeout(key) do + val, cas = get(key) - if val.nil? - set_initial(key, options) - next - end + break if val.nil? - locks = deserialize_and_clear_locks(val, options) + locks = deserialize_and_clear_locks(val) - refresh_lock(locks, acquisition_token) + acquisition_lock = remove_lock(locks, acquisition_token) - break if set(key, serialize_locks(locks), cas, options) - end + break unless acquisition_lock + break if set(key, serialize_locks(locks), cas) end + rescue LockClientError => _ # rubocop:disable Lint/HandleExceptions + # ignore - assume success due to optimistic locking + end - def unlock(key, acquisition_token, options = {}) - options = merge_defaults(options) + def clear(key) # rubocop:disable Lint/UnusedMethodArgument + fail NotImplementedError + end - return unless acquisition_token + private - retry_with_timeout(key, options) do - val, cas = get(key, options) + def acquire_lock(key, resources = 1) + acquisition_token = nil + token = SecureRandom.base64(16) - break if val.nil? + retry_with_timeout(key) do + val, cas = get(key) - locks = deserialize_and_clear_locks(val, options) - - acquisition_lock = remove_lock(locks, acquisition_token) - - break unless acquisition_lock - break if set(key, serialize_locks(locks), cas, options) + if val.nil? + set_initial(key) + next end - rescue LockClientError => _ # rubocop:disable Lint/HandleExceptions - # ignore - assume success due to optimistic locking - end - def clear(key, options = {}) # rubocop:disable Lint/UnusedMethodArgument - fail NotImplementedError - end + locks = deserialize_and_clear_locks(val) - def merge_defaults(options = {}) - options = self::DEFAULT_OPTIONS.merge(options) + if locks.size < resources + add_lock(locks, token) - fail "Client required" unless options[:client] + newval = serialize_locks(locks) - options + if set(key, newval, cas) + acquisition_token = token + break + end + end end - private + acquisition_token + end - def get(key, options) # rubocop:disable Lint/UnusedMethodArgument - fail NotImplementedError - end + def get(key) # rubocop:disable Lint/UnusedMethodArgument + fail NotImplementedError + end - def set(key, newval, oldval, options) # rubocop:disable Lint/UnusedMethodArgument - fail NotImplementedError - end + def set(key, newval, oldval) # rubocop:disable Lint/UnusedMethodArgument + fail NotImplementedError + end - def set_initial(key, options) # rubocop:disable Lint/UnusedMethodArgument - fail NotImplementedError - end + def set_initial(key) # rubocop:disable Lint/UnusedMethodArgument + fail NotImplementedError + end - def synchronize(key, options) - yield(key, options) - end + def synchronize(key) # rubocop:disable Lint/UnusedMethodArgument + mon_synchronize { yield } + end - def retry_with_timeout(key, options) - count = (options[:retry_timeout] / options[:retry_delay].to_f).ceil + def retry_with_timeout(key) + start = Time.now.to_f - start = Time.now.to_f + @retry_count.times do + now = Time.now.to_f + break if now - start > @options[:acquisition_timeout] - count.times do - now = Time.now.to_f - break if now - start > options[:retry_timeout] - - synchronize(key, options) do - yield - end - - sleep(rand(options[:retry_delay] * 1000).to_f / 1000) + synchronize(key) do + yield end - rescue => _ - raise LockClientError - end - def serialize_locks(locks) - MessagePack.pack(locks.map { |time, token| [time.to_f, token] }) + sleep(rand(@options[:acquisition_delay] * 1000).to_f / 1000) end + rescue => _ + raise LockClientError + end - def deserialize_and_clear_locks(val, options) - clear_expired_locks(deserialize_locks(val), options) - end + def serialize_locks(locks) + MessagePack.pack(locks.map { |time, token| [time.to_f, token] }) + end - def deserialize_locks(val) - unpacked = (val.nil? || val == "") ? [] : MessagePack.unpack(val) + def deserialize_and_clear_locks(val) + clear_expired_locks(deserialize_locks(val)) + end - unpacked.map do |time, token| - [Time.at(time), token] - end - rescue EOFError => _ - [] - end + def deserialize_locks(val) + unpacked = (val.nil? || val == "") ? [] : MessagePack.unpack(val) - def clear_expired_locks(locks, options) - expired = Time.now - options[:stale_lock_expiration] - locks.reject { |time, _| time < expired } + unpacked.map do |time, token| + [Time.at(time), token] end + rescue EOFError => _ + [] + end - def add_lock(locks, token) - locks << [Time.now.to_f, token] - end + def clear_expired_locks(locks) + expired = Time.now - @options[:stale_lock_expiration] + locks.reject { |time, _| time < expired } + end - def remove_lock(locks, acquisition_token) - lock = locks.find { |_, token| token == acquisition_token } - locks.delete(lock) - end + def add_lock(locks, token) + locks << [Time.now.to_f, token] + end - def refresh_lock(locks, acquisition_token) - remove_lock(locks, acquisition_token) - add_lock(locks, token) - end + def remove_lock(locks, acquisition_token) + lock = locks.find { |_, token| token == acquisition_token } + locks.delete(lock) + end + + def refresh_lock(locks, acquisition_token) + remove_lock(locks, acquisition_token) + add_lock(locks, token) end end end end