lib/lusnoc/mutex.rb in lusnoc-0.1.2.16550 vs lib/lusnoc/mutex.rb in lusnoc-0.1.2.16562

- old
+ new

@@ -44,11 +44,11 @@ end return acquisition_loop! key, session, value, t, &block ensure release(key, session.id, timeout: 2) rescue nil - logger.info("Lock #{key} released for session #{session.name}[#{session.id}]") + logger.info("Mutex[#{key}] released for Session[#{session.name}:#{session.id}]") @owner = nil @session = nil end end @@ -57,35 +57,57 @@ def acquire(key, session, value) resp = Lusnoc.http_put(build_url("/v1/kv/#{key}?acquire=#{session.id}"), value, timeout: 1) return false if resp.body.chomp != 'true' @owner = Thread.current - logger.info("Lock #{key} acquired for session #{session.name}[#{session.id}]") + logger.info("Mutex[#{key}] acquired for Session[#{session.name}:#{session.id}]") renew true end def release(key, session) Lusnoc.http_put(build_url("/v1/kv/#{key}?release=#{session.id}"), timeout: 1) end def acquisition_loop!(key, session, value, t) - return yield(self) if acquire(key, session, value) + if acquire(key, session, value) + prepare_guard(session, key).run do + return yield(self) + end + end - logger.debug("Start #{key} acquisition loop for session #{session.name}[#{session.id}]") + logger.debug("Mutex[#{key}] run acquisition loop for Session[#{session.name}:#{session.id}]") t.loop! do session.alive!(TimeoutError) wait_for_key_released(key, t.left) - return yield(self) if acquire(key, session, value) + if acquire(key, session, value) + prepare_guard(session, key).run do + return yield(self) + end + end - logger.debug("Lock #{key} acquisition failed for session #{session.name}[#{session.id}]") - sleep 1 + logger.debug("Mutex[#{key}] acquisition failed for Session[#{session.name}:#{session.id}]") + sleep 0.4 end end + def prepare_guard(session, key) + Lusnoc::Guard.new(build_url("/v1/kv/#{key}")) do |guard| + guard.condition do |body| + JSON.parse(body).first['Session'] == session.id rescue false + end + + guard.then do + @owner = nil + logger.info("Mutex[#{key}] LOST for Session[#{session.name}:#{session.id}]") + @on_mutex_lost&.call(self) + end + end + end + def wait_for_key_released(key, timeout = nil) - logger.debug "Waiting for key #{key} to be fre of any session" + logger.debug("Mutex[#{key}] start waiting of key releasing...") Lusnoc::Watcher.new(build_url("/v1/kv/#{key}"), timeout: timeout, eclass: TimeoutError, emessage: 'mutex acquisition expired').run do |body| result = JSON.parse(body.empty? ? '[{}]' : body)