lib/suo/client/base.rb in suo-0.1.3 vs lib/suo/client/base.rb in suo-0.2.0
- old
+ new
@@ -1,207 +1,189 @@
module Suo
module Client
class Base
-
DEFAULT_OPTIONS = {
- retry_timeout: 0.1,
- retry_delay: 0.01,
+ acquisition_timeout: 0.1,
+ acquisition_delay: 0.01,
stale_lock_expiration: 3600
}.freeze
+ attr_accessor :client
+
+ include MonitorMixin
+
def initialize(options = {})
- @options = self.class.merge_defaults(options)
+ fail "Client required" unless options[:client]
+ @options = DEFAULT_OPTIONS.merge(options)
+ @retry_count = (@options[:acquisition_timeout] / @options[:acquisition_delay].to_f).ceil
+ @client = @options[:client]
+ super()
end
- def lock(key, resources = 1, options = {})
- options = self.class.merge_defaults(@options.merge(options))
- token = self.class.lock(key, resources, options)
+ def lock(key, resources = 1)
+ token = acquire_lock(key, resources)
- if token
+ if block_given? && token
begin
- yield if block_given?
+ yield
ensure
- self.class.unlock(key, token, options)
+ unlock(key, token)
end
-
- true
else
- false
+ token
end
end
def locked?(key, resources = 1)
- self.class.locked?(key, resources, @options)
+ locks(key).size >= resources
end
- class << self
- def lock(key, resources = 1, options = {})
- options = merge_defaults(options)
- acquisition_token = nil
- token = SecureRandom.base64(16)
+ def locks(key)
+ val, _ = get(key)
+ locks = deserialize_locks(val)
- retry_with_timeout(key, options) do
- val, cas = get(key, options)
+ locks
+ end
- if val.nil?
- set_initial(key, options)
- next
- end
+ def refresh(key, acquisition_token)
+ retry_with_timeout(key) do
+ val, cas = get(key)
- locks = deserialize_and_clear_locks(val, options)
-
- if locks.size < resources
- add_lock(locks, token)
-
- newval = serialize_locks(locks)
-
- if set(key, newval, cas, options)
- acquisition_token = token
- break
- end
- end
+ if val.nil?
+ set_initial(key)
+ next
end
- acquisition_token
- end
+ locks = deserialize_and_clear_locks(val)
- def locked?(key, resources = 1, options = {})
- locks(key, options).size >= resources
- end
+ refresh_lock(locks, acquisition_token)
- def locks(key, options)
- options = merge_defaults(options)
- val, _ = get(key, options)
- locks = deserialize_locks(val)
-
- locks
+ break if set(key, serialize_locks(locks), cas)
end
+ end
- def refresh(key, acquisition_token, options = {})
- options = merge_defaults(options)
+ def unlock(key, acquisition_token)
+ return unless acquisition_token
- retry_with_timeout(key, options) do
- val, cas = get(key, options)
+ retry_with_timeout(key) do
+ val, cas = get(key)
- if val.nil?
- set_initial(key, options)
- next
- end
+ break if val.nil?
- locks = deserialize_and_clear_locks(val, options)
+ locks = deserialize_and_clear_locks(val)
- refresh_lock(locks, acquisition_token)
+ acquisition_lock = remove_lock(locks, acquisition_token)
- break if set(key, serialize_locks(locks), cas, options)
- end
+ break unless acquisition_lock
+ break if set(key, serialize_locks(locks), cas)
end
+ rescue LockClientError => _ # rubocop:disable Lint/HandleExceptions
+ # ignore - assume success due to optimistic locking
+ end
- def unlock(key, acquisition_token, options = {})
- options = merge_defaults(options)
+ def clear(key) # rubocop:disable Lint/UnusedMethodArgument
+ fail NotImplementedError
+ end
- return unless acquisition_token
+ private
- retry_with_timeout(key, options) do
- val, cas = get(key, options)
+ def acquire_lock(key, resources = 1)
+ acquisition_token = nil
+ token = SecureRandom.base64(16)
- break if val.nil?
+ retry_with_timeout(key) do
+ val, cas = get(key)
- locks = deserialize_and_clear_locks(val, options)
-
- acquisition_lock = remove_lock(locks, acquisition_token)
-
- break unless acquisition_lock
- break if set(key, serialize_locks(locks), cas, options)
+ if val.nil?
+ set_initial(key)
+ next
end
- rescue LockClientError => _ # rubocop:disable Lint/HandleExceptions
- # ignore - assume success due to optimistic locking
- end
- def clear(key, options = {}) # rubocop:disable Lint/UnusedMethodArgument
- fail NotImplementedError
- end
+ locks = deserialize_and_clear_locks(val)
- def merge_defaults(options = {})
- options = self::DEFAULT_OPTIONS.merge(options)
+ if locks.size < resources
+ add_lock(locks, token)
- fail "Client required" unless options[:client]
+ newval = serialize_locks(locks)
- options
+ if set(key, newval, cas)
+ acquisition_token = token
+ break
+ end
+ end
end
- private
+ acquisition_token
+ end
- def get(key, options) # rubocop:disable Lint/UnusedMethodArgument
- fail NotImplementedError
- end
+ def get(key) # rubocop:disable Lint/UnusedMethodArgument
+ fail NotImplementedError
+ end
- def set(key, newval, oldval, options) # rubocop:disable Lint/UnusedMethodArgument
- fail NotImplementedError
- end
+ def set(key, newval, oldval) # rubocop:disable Lint/UnusedMethodArgument
+ fail NotImplementedError
+ end
- def set_initial(key, options) # rubocop:disable Lint/UnusedMethodArgument
- fail NotImplementedError
- end
+ def set_initial(key) # rubocop:disable Lint/UnusedMethodArgument
+ fail NotImplementedError
+ end
- def synchronize(key, options)
- yield(key, options)
- end
+ def synchronize(key) # rubocop:disable Lint/UnusedMethodArgument
+ mon_synchronize { yield }
+ end
- def retry_with_timeout(key, options)
- count = (options[:retry_timeout] / options[:retry_delay].to_f).ceil
+ def retry_with_timeout(key)
+ start = Time.now.to_f
- start = Time.now.to_f
+ @retry_count.times do
+ now = Time.now.to_f
+ break if now - start > @options[:acquisition_timeout]
- count.times do
- now = Time.now.to_f
- break if now - start > options[:retry_timeout]
-
- synchronize(key, options) do
- yield
- end
-
- sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
+ synchronize(key) do
+ yield
end
- rescue => _
- raise LockClientError
- end
- def serialize_locks(locks)
- MessagePack.pack(locks.map { |time, token| [time.to_f, token] })
+ sleep(rand(@options[:acquisition_delay] * 1000).to_f / 1000)
end
+ rescue => _
+ raise LockClientError
+ end
- def deserialize_and_clear_locks(val, options)
- clear_expired_locks(deserialize_locks(val), options)
- end
+ def serialize_locks(locks)
+ MessagePack.pack(locks.map { |time, token| [time.to_f, token] })
+ end
- def deserialize_locks(val)
- unpacked = (val.nil? || val == "") ? [] : MessagePack.unpack(val)
+ def deserialize_and_clear_locks(val)
+ clear_expired_locks(deserialize_locks(val))
+ end
- unpacked.map do |time, token|
- [Time.at(time), token]
- end
- rescue EOFError => _
- []
- end
+ def deserialize_locks(val)
+ unpacked = (val.nil? || val == "") ? [] : MessagePack.unpack(val)
- def clear_expired_locks(locks, options)
- expired = Time.now - options[:stale_lock_expiration]
- locks.reject { |time, _| time < expired }
+ unpacked.map do |time, token|
+ [Time.at(time), token]
end
+ rescue EOFError => _
+ []
+ end
- def add_lock(locks, token)
- locks << [Time.now.to_f, token]
- end
+ def clear_expired_locks(locks)
+ expired = Time.now - @options[:stale_lock_expiration]
+ locks.reject { |time, _| time < expired }
+ end
- def remove_lock(locks, acquisition_token)
- lock = locks.find { |_, token| token == acquisition_token }
- locks.delete(lock)
- end
+ def add_lock(locks, token)
+ locks << [Time.now.to_f, token]
+ end
- def refresh_lock(locks, acquisition_token)
- remove_lock(locks, acquisition_token)
- add_lock(locks, token)
- end
+ def remove_lock(locks, acquisition_token)
+ lock = locks.find { |_, token| token == acquisition_token }
+ locks.delete(lock)
+ end
+
+ def refresh_lock(locks, acquisition_token)
+ remove_lock(locks, acquisition_token)
+ add_lock(locks, token)
end
end
end
end