lib/sidekiq/throttled/strategy/concurrency.rb in sidekiq-throttled-0.3.2 vs lib/sidekiq/throttled/strategy/concurrency.rb in sidekiq-throttled-0.4.0
- old
+ new
@@ -15,34 +15,37 @@
# PUSH(@key, @jid)
# return 0
SCRIPT = Script.new File.read "#{__dir__}/concurrency.lua"
private_constant :SCRIPT
- # @!attribute [r] limit
- # @return [Integer] Amount of allwoed concurrent job processors
- attr_reader :limit
-
# @param [#to_s] strategy_key
# @param [Hash] opts
- # @option opts [#to_i] :limit Amount of allwoed concurrent jobs
+ # @option opts [#to_i] :limit Amount of allowed concurrent jobs
# processors running for given key
# @option opts [#to_i] :ttl (15 minutes) Concurrency lock TTL
# in seconds
# @option opts :key_suffix Proc for dynamic key suffix.
def initialize(strategy_key, opts)
- @base_key = "#{strategy_key}:concurrency".freeze
- @limit = opts.fetch(:limit).to_i
- @ttl = opts.fetch(:ttl, 900).to_i
+ @base_key = "#{strategy_key}:concurrency".freeze
+ @limit = opts.fetch(:limit)
+ @ttl = opts.fetch(:ttl, 900).to_i
@key_suffix = opts[:key_suffix]
end
- def dynamic_keys?
- @key_suffix
+ # @return [Integer] Amount of allowed concurrent job processors
+ def limit(job_args = nil)
+ return @limit.to_i unless @limit.respond_to? :call
+ @limit.call(*job_args).to_i
end
+ # @return [Boolean] Whenever strategy has dynamic config
+ def dynamic?
+ @key_suffix || @limit.respond_to?(:call)
+ end
+
# @return [Boolean] whenever job is throttled or not
def throttled?(jid, *job_args)
- 1 == SCRIPT.eval([key(job_args)], [@limit, @ttl, jid.to_s])
+ 1 == SCRIPT.eval([key(job_args)], [limit(job_args), @ttl, jid.to_s])
end
# @return [Integer] Current count of jobs
def count(*job_args)
Sidekiq.redis { |conn| conn.scard(key(job_args)) }.to_i