lib/suo/client/base.rb in suo-0.2.3 vs lib/suo/client/base.rb in suo-0.3.0
- old
+ new
@@ -2,145 +2,150 @@
module Client
class Base
DEFAULT_OPTIONS = {
acquisition_timeout: 0.1,
acquisition_delay: 0.01,
- stale_lock_expiration: 3600
+ stale_lock_expiration: 3600,
+ resources: 1
}.freeze
- attr_accessor :client
+ attr_accessor :client, :key, :resources, :options
include MonitorMixin
- def initialize(options = {})
+ def initialize(key, options = {})
fail "Client required" unless options[:client]
@options = DEFAULT_OPTIONS.merge(options)
@retry_count = (@options[:acquisition_timeout] / @options[:acquisition_delay].to_f).ceil
@client = @options[:client]
- super()
+ @resources = @options[:resources].to_i
+ @key = key
+ super() # initialize Monitor mixin for thread safety
end
- def lock(key, resources = 1)
- token = acquire_lock(key, resources)
+ def lock
+ token = acquire_lock
if block_given? && token
begin
- yield(token)
+ yield
ensure
- unlock(key, token)
+ unlock(token)
end
else
token
end
end
- def locked?(key, resources = 1)
- locks(key).size >= resources
+ def locked?
+ locks.size >= resources
end
- def locks(key)
- val, _ = get(key)
+ def locks
+ val, _ = get
cleared_locks = deserialize_and_clear_locks(val)
cleared_locks
end
- def refresh(key, acquisition_token)
- retry_with_timeout(key) do
- val, cas = get(key)
+ def refresh(token)
+ retry_with_timeout do
+ val, cas = get
if val.nil?
- initial_set(key)
+ initial_set
next
end
cleared_locks = deserialize_and_clear_locks(val)
- refresh_lock(cleared_locks, acquisition_token)
+ refresh_lock(cleared_locks, token)
- break if set(key, serialize_locks(cleared_locks), cas)
+ break if set(serialize_locks(cleared_locks), cas)
end
end
- def unlock(key, acquisition_token)
- return unless acquisition_token
+ def unlock(token)
+ return unless token
- retry_with_timeout(key) do
- val, cas = get(key)
+ retry_with_timeout do
+ val, cas = get
break if val.nil?
cleared_locks = deserialize_and_clear_locks(val)
- acquisition_lock = remove_lock(cleared_locks, acquisition_token)
+ acquisition_lock = remove_lock(cleared_locks, token)
break unless acquisition_lock
- break if set(key, serialize_locks(cleared_locks), cas)
+ break if set(serialize_locks(cleared_locks), cas)
end
rescue LockClientError => _ # rubocop:disable Lint/HandleExceptions
# ignore - assume success due to optimistic locking
end
- def clear(key) # rubocop:disable Lint/UnusedMethodArgument
+ def clear
fail NotImplementedError
end
private
- def acquire_lock(key, resources = 1)
+ attr_accessor :retry_count
+
+ def acquire_lock
token = SecureRandom.base64(16)
- retry_with_timeout(key) do
- val, cas = get(key)
+ retry_with_timeout do
+ val, cas = get
if val.nil?
- initial_set(key)
+ initial_set
next
end
cleared_locks = deserialize_and_clear_locks(val)
if cleared_locks.size < resources
add_lock(cleared_locks, token)
newval = serialize_locks(cleared_locks)
- return token if set(key, newval, cas)
+ return token if set(newval, cas)
end
end
nil
end
- def get(key) # rubocop:disable Lint/UnusedMethodArgument
+ def get
fail NotImplementedError
end
- def set(key, newval, cas) # rubocop:disable Lint/UnusedMethodArgument
+ def set(newval, cas) # rubocop:disable Lint/UnusedMethodArgument
fail NotImplementedError
end
- def initial_set(key, val = "") # rubocop:disable Lint/UnusedMethodArgument
+ def initial_set(val = "") # rubocop:disable Lint/UnusedMethodArgument
fail NotImplementedError
end
- def synchronize(key) # rubocop:disable Lint/UnusedMethodArgument
+ def synchronize
mon_synchronize { yield }
end
- def retry_with_timeout(key)
+ def retry_with_timeout
start = Time.now.to_f
- @retry_count.times do
+ retry_count.times do
elapsed = Time.now.to_f - start
- break if elapsed >= @options[:acquisition_timeout]
+ break if elapsed >= options[:acquisition_timeout]
- synchronize(key) do
+ synchronize do
yield
end
- sleep(rand(@options[:acquisition_delay] * 1000).to_f / 1000)
+ sleep(rand(options[:acquisition_delay] * 1000).to_f / 1000)
end
rescue => _
raise LockClientError
end
@@ -161,10 +166,10 @@
rescue EOFError, MessagePack::MalformedFormatError => _
[]
end
def clear_expired_locks(locks)
- expired = Time.now - @options[:stale_lock_expiration]
+ expired = Time.now - options[:stale_lock_expiration]
locks.reject { |time, _| time < expired }
end
def add_lock(locks, token, time = Time.now.to_f)
locks << [time, token]