lib/sidekiq/limit_fetch/global/selector.rb in sidekiq-limit_fetch-4.4.0 vs lib/sidekiq/limit_fetch/global/selector.rb in sidekiq-limit_fetch-4.4.1
- old
+ new
@@ -1,54 +1,57 @@
-module Sidekiq::LimitFetch::Global
- module Selector
- extend self
+# frozen_string_literal: true
- MUTEX_FOR_UUID = Mutex.new
+module Sidekiq
+ module LimitFetch
+ module Global
+ module Selector
+ extend self
- def acquire(queues, namespace)
- redis_eval :acquire, [namespace, uuid, queues]
- end
+ MUTEX_FOR_UUID = Mutex.new
- def release(queues, namespace)
- redis_eval :release, [namespace, uuid, queues]
- end
+ def acquire(queues, namespace)
+ redis_eval :acquire, [namespace, uuid, queues]
+ end
- def uuid
- # - if we'll remove "@uuid ||=" from inside of mutex
- # then @uuid can be overwritten
- # - if we'll remove "@uuid ||=" from outside of mutex
- # then each read will lead to mutex
- @uuid ||= MUTEX_FOR_UUID.synchronize { @uuid || SecureRandom.uuid }
- end
+ def release(queues, namespace)
+ redis_eval :release, [namespace, uuid, queues]
+ end
- private
+ def uuid
+ # - if we'll remove "@uuid ||=" from inside of mutex
+ # then @uuid can be overwritten
+ # - if we'll remove "@uuid ||=" from outside of mutex
+ # then each read will lead to mutex
+ @uuid ||= MUTEX_FOR_UUID.synchronize { @uuid || SecureRandom.uuid }
+ end
- def redis_eval(script_name, args)
- Sidekiq.redis do |it|
- begin
- it.evalsha send("redis_#{script_name}_sha"), [], args
- rescue Sidekiq::LimitFetch::RedisCommandError => error
- raise unless error.message.include? 'NOSCRIPT'
- if Sidekiq::LimitFetch.post_7?
- it.eval send("redis_#{script_name}_script"), 0, *args
- else
- it.eval send("redis_#{script_name}_script"), argv: args
+ private
+
+ def redis_eval(script_name, args)
+ Sidekiq.redis do |it|
+ it.evalsha send("redis_#{script_name}_sha"), [], args
+ rescue Sidekiq::LimitFetch::RedisCommandError => e
+ raise unless e.message.include? 'NOSCRIPT'
+
+ if Sidekiq::LimitFetch.post_7?
+ it.eval send("redis_#{script_name}_script"), 0, *args
+ else
+ it.eval send("redis_#{script_name}_script"), argv: args
+ end
end
end
- end
- end
- def redis_acquire_sha
- @acquire_sha ||= OpenSSL::Digest::SHA1.hexdigest redis_acquire_script
- end
+ def redis_acquire_sha
+ @redis_acquire_sha ||= OpenSSL::Digest::SHA1.hexdigest redis_acquire_script
+ end
- def redis_release_sha
- @release_sha ||= OpenSSL::Digest::SHA1.hexdigest redis_release_script
- end
+ def redis_release_sha
+ @redis_release_sha ||= OpenSSL::Digest::SHA1.hexdigest redis_release_script
+ end
- def redis_acquire_script
- <<-LUA
+ def redis_acquire_script
+ <<-LUA
local namespace = table.remove(ARGV, 1)..'limit_fetch:'
local worker_name = table.remove(ARGV, 1)
local queues = ARGV
local available = {}
local unblocked = {}
@@ -108,22 +111,24 @@
end
end
end
return available
- LUA
- end
+ LUA
+ end
- def redis_release_script
- <<-LUA
+ def redis_release_script
+ <<-LUA
local namespace = table.remove(ARGV, 1)..'limit_fetch:'
local worker_name = table.remove(ARGV, 1)
local queues = ARGV
for _, queue in ipairs(queues) do
local probed_key = namespace..'probed:'..queue
redis.call('lrem', probed_key, 1, worker_name)
end
- LUA
+ LUA
+ end
+ end
end
end
end