lib/sidekiq/limit_fetch/global/selector.rb in sidekiq-limit_fetch-4.4.0 vs lib/sidekiq/limit_fetch/global/selector.rb in sidekiq-limit_fetch-4.4.1

- old
+ new

@@ -1,54 +1,57 @@ -module Sidekiq::LimitFetch::Global - module Selector - extend self +# frozen_string_literal: true - MUTEX_FOR_UUID = Mutex.new +module Sidekiq + module LimitFetch + module Global + module Selector + extend self - def acquire(queues, namespace) - redis_eval :acquire, [namespace, uuid, queues] - end + MUTEX_FOR_UUID = Mutex.new - def release(queues, namespace) - redis_eval :release, [namespace, uuid, queues] - end + def acquire(queues, namespace) + redis_eval :acquire, [namespace, uuid, queues] + end - def uuid - # - if we'll remove "@uuid ||=" from inside of mutex - # then @uuid can be overwritten - # - if we'll remove "@uuid ||=" from outside of mutex - # then each read will lead to mutex - @uuid ||= MUTEX_FOR_UUID.synchronize { @uuid || SecureRandom.uuid } - end + def release(queues, namespace) + redis_eval :release, [namespace, uuid, queues] + end - private + def uuid + # - if we'll remove "@uuid ||=" from inside of mutex + # then @uuid can be overwritten + # - if we'll remove "@uuid ||=" from outside of mutex + # then each read will lead to mutex + @uuid ||= MUTEX_FOR_UUID.synchronize { @uuid || SecureRandom.uuid } + end - def redis_eval(script_name, args) - Sidekiq.redis do |it| - begin - it.evalsha send("redis_#{script_name}_sha"), [], args - rescue Sidekiq::LimitFetch::RedisCommandError => error - raise unless error.message.include? 'NOSCRIPT' - if Sidekiq::LimitFetch.post_7? - it.eval send("redis_#{script_name}_script"), 0, *args - else - it.eval send("redis_#{script_name}_script"), argv: args + private + + def redis_eval(script_name, args) + Sidekiq.redis do |it| + it.evalsha send("redis_#{script_name}_sha"), [], args + rescue Sidekiq::LimitFetch::RedisCommandError => e + raise unless e.message.include? 'NOSCRIPT' + + if Sidekiq::LimitFetch.post_7? + it.eval send("redis_#{script_name}_script"), 0, *args + else + it.eval send("redis_#{script_name}_script"), argv: args + end end end - end - end - def redis_acquire_sha - @acquire_sha ||= OpenSSL::Digest::SHA1.hexdigest redis_acquire_script - end + def redis_acquire_sha + @redis_acquire_sha ||= OpenSSL::Digest::SHA1.hexdigest redis_acquire_script + end - def redis_release_sha - @release_sha ||= OpenSSL::Digest::SHA1.hexdigest redis_release_script - end + def redis_release_sha + @redis_release_sha ||= OpenSSL::Digest::SHA1.hexdigest redis_release_script + end - def redis_acquire_script - <<-LUA + def redis_acquire_script + <<-LUA local namespace = table.remove(ARGV, 1)..'limit_fetch:' local worker_name = table.remove(ARGV, 1) local queues = ARGV local available = {} local unblocked = {} @@ -108,22 +111,24 @@ end end end return available - LUA - end + LUA + end - def redis_release_script - <<-LUA + def redis_release_script + <<-LUA local namespace = table.remove(ARGV, 1)..'limit_fetch:' local worker_name = table.remove(ARGV, 1) local queues = ARGV for _, queue in ipairs(queues) do local probed_key = namespace..'probed:'..queue redis.call('lrem', probed_key, 1, worker_name) end - LUA + LUA + end + end end end end