lib/lusnoc/mutex.rb in lusnoc-0.1.2.16550 vs lib/lusnoc/mutex.rb in lusnoc-0.1.2.16562
- old
+ new
@@ -44,11 +44,11 @@
end
return acquisition_loop! key, session, value, t, &block
ensure
release(key, session.id, timeout: 2) rescue nil
- logger.info("Lock #{key} released for session #{session.name}[#{session.id}]")
+ logger.info("Mutex[#{key}] released for Session[#{session.name}:#{session.id}]")
@owner = nil
@session = nil
end
end
@@ -57,35 +57,57 @@
def acquire(key, session, value)
resp = Lusnoc.http_put(build_url("/v1/kv/#{key}?acquire=#{session.id}"), value, timeout: 1)
return false if resp.body.chomp != 'true'
@owner = Thread.current
- logger.info("Lock #{key} acquired for session #{session.name}[#{session.id}]")
+ logger.info("Mutex[#{key}] acquired for Session[#{session.name}:#{session.id}]")
renew
true
end
def release(key, session)
Lusnoc.http_put(build_url("/v1/kv/#{key}?release=#{session.id}"), timeout: 1)
end
def acquisition_loop!(key, session, value, t)
- return yield(self) if acquire(key, session, value)
+ if acquire(key, session, value)
+ prepare_guard(session, key).run do
+ return yield(self)
+ end
+ end
- logger.debug("Start #{key} acquisition loop for session #{session.name}[#{session.id}]")
+ logger.debug("Mutex[#{key}] run acquisition loop for Session[#{session.name}:#{session.id}]")
t.loop! do
session.alive!(TimeoutError)
wait_for_key_released(key, t.left)
- return yield(self) if acquire(key, session, value)
+ if acquire(key, session, value)
+ prepare_guard(session, key).run do
+ return yield(self)
+ end
+ end
- logger.debug("Lock #{key} acquisition failed for session #{session.name}[#{session.id}]")
- sleep 1
+ logger.debug("Mutex[#{key}] acquisition failed for Session[#{session.name}:#{session.id}]")
+ sleep 0.4
end
end
+ def prepare_guard(session, key)
+ Lusnoc::Guard.new(build_url("/v1/kv/#{key}")) do |guard|
+ guard.condition do |body|
+ JSON.parse(body).first['Session'] == session.id rescue false
+ end
+
+ guard.then do
+ @owner = nil
+ logger.info("Mutex[#{key}] LOST for Session[#{session.name}:#{session.id}]")
+ @on_mutex_lost&.call(self)
+ end
+ end
+ end
+
def wait_for_key_released(key, timeout = nil)
- logger.debug "Waiting for key #{key} to be fre of any session"
+ logger.debug("Mutex[#{key}] start waiting of key releasing...")
Lusnoc::Watcher.new(build_url("/v1/kv/#{key}"),
timeout: timeout,
eclass: TimeoutError,
emessage: 'mutex acquisition expired').run do |body|
result = JSON.parse(body.empty? ? '[{}]' : body)