lib/redstream/lock.rb in redstream-0.5.0 vs lib/redstream/lock.rb in redstream-0.6.0

- old
+ new

@@ -19,44 +19,71 @@ # # sleep(5) unless got_lock # end class Lock + class Signal + def initialize + @mutex = Mutex.new + @condition_variable = ConditionVariable.new + end + + def wait(timeout) + @mutex.synchronize { @condition_variable.wait(@mutex, timeout) } + end + + def signal + @condition_variable.signal + end + end + def initialize(name:) @name = name @id = SecureRandom.hex end def acquire(&block) got_lock = get_lock - keep_lock(&block) if got_lock + + if got_lock + keep_lock(&block) + release_lock + end + got_lock end + def wait(timeout) + @wait_redis ||= Redstream.connection_pool.with(&:dup) + @wait_redis.brpop("#{Redstream.lock_key_name(@name)}.notify", timeout: timeout) + end + private def keep_lock(&block) - stop = false - mutex = Mutex.new + stopped = false + signal = Signal.new - Thread.new do - until mutex.synchronize { stop } - Redstream.connection_pool.with { |redis| redis.expire(Redstream.lock_key_name(@name), 5) } + thread = Thread.new do + until stopped + Redstream.connection_pool.with do |redis| + redis.expire(Redstream.lock_key_name(@name), 5) + end - sleep 3 + signal.wait(3) end end block.call ensure - mutex.synchronize do - stop = true - end + stopped = true + signal&.signal + thread&.join end def get_lock - @get_lock_script = <<~GET_LOCK_SCRIPT + @get_lock_script = <<~SCRIPT local lock_key_name, id = ARGV[1], ARGV[2] local cur = redis.call('get', lock_key_name) if not cur then @@ -68,11 +95,32 @@ return true end return false - GET_LOCK_SCRIPT + SCRIPT - Redstream.connection_pool.with { |redis| redis.eval(@get_lock_script, argv: [Redstream.lock_key_name(@name), @id]) } + Redstream.connection_pool.with do |redis| + redis.eval(@get_lock_script, argv: [Redstream.lock_key_name(@name), @id]) + end + end + + def release_lock + @release_lock_script = <<~SCRIPT + local lock_key_name, id = ARGV[1], ARGV[2] + + local cur = redis.call('get', lock_key_name) + + if cur and cur == id then + redis.call('del', lock_key_name) + end + + redis.call('del', lock_key_name .. '.notify') + redis.call('rpush', lock_key_name .. '.notify', '1') + SCRIPT + + Redstream.connection_pool.with do |redis| + redis.eval(@release_lock_script, argv: [Redstream.lock_key_name(@name), @id]) + end end end end