lib/redstream/lock.rb in redstream-0.5.0 vs lib/redstream/lock.rb in redstream-0.6.0
- old
+ new
@@ -19,44 +19,71 @@
#
# sleep(5) unless got_lock
# end
class Lock
+ class Signal
+ def initialize
+ @mutex = Mutex.new
+ @condition_variable = ConditionVariable.new
+ end
+
+ def wait(timeout)
+ @mutex.synchronize { @condition_variable.wait(@mutex, timeout) }
+ end
+
+ def signal
+ @condition_variable.signal
+ end
+ end
+
def initialize(name:)
@name = name
@id = SecureRandom.hex
end
def acquire(&block)
got_lock = get_lock
- keep_lock(&block) if got_lock
+
+ if got_lock
+ keep_lock(&block)
+ release_lock
+ end
+
got_lock
end
+ def wait(timeout)
+ @wait_redis ||= Redstream.connection_pool.with(&:dup)
+ @wait_redis.brpop("#{Redstream.lock_key_name(@name)}.notify", timeout: timeout)
+ end
+
private
def keep_lock(&block)
- stop = false
- mutex = Mutex.new
+ stopped = false
+ signal = Signal.new
- Thread.new do
- until mutex.synchronize { stop }
- Redstream.connection_pool.with { |redis| redis.expire(Redstream.lock_key_name(@name), 5) }
+ thread = Thread.new do
+ until stopped
+ Redstream.connection_pool.with do |redis|
+ redis.expire(Redstream.lock_key_name(@name), 5)
+ end
- sleep 3
+ signal.wait(3)
end
end
block.call
ensure
- mutex.synchronize do
- stop = true
- end
+ stopped = true
+ signal&.signal
+ thread&.join
end
def get_lock
- @get_lock_script = <<~GET_LOCK_SCRIPT
+ @get_lock_script = <<~SCRIPT
local lock_key_name, id = ARGV[1], ARGV[2]
local cur = redis.call('get', lock_key_name)
if not cur then
@@ -68,11 +95,32 @@
return true
end
return false
- GET_LOCK_SCRIPT
+ SCRIPT
- Redstream.connection_pool.with { |redis| redis.eval(@get_lock_script, argv: [Redstream.lock_key_name(@name), @id]) }
+ Redstream.connection_pool.with do |redis|
+ redis.eval(@get_lock_script, argv: [Redstream.lock_key_name(@name), @id])
+ end
+ end
+
+ def release_lock
+ @release_lock_script = <<~SCRIPT
+ local lock_key_name, id = ARGV[1], ARGV[2]
+
+ local cur = redis.call('get', lock_key_name)
+
+ if cur and cur == id then
+ redis.call('del', lock_key_name)
+ end
+
+ redis.call('del', lock_key_name .. '.notify')
+ redis.call('rpush', lock_key_name .. '.notify', '1')
+ SCRIPT
+
+ Redstream.connection_pool.with do |redis|
+ redis.eval(@release_lock_script, argv: [Redstream.lock_key_name(@name), @id])
+ end
end
end
end