lib/sidekiq/throttled/strategy/concurrency.rb in sidekiq-throttled-0.3.2 vs lib/sidekiq/throttled/strategy/concurrency.rb in sidekiq-throttled-0.4.0

- old
+ new

@@ -15,34 +15,37 @@ # PUSH(@key, @jid) # return 0 SCRIPT = Script.new File.read "#{__dir__}/concurrency.lua" private_constant :SCRIPT - # @!attribute [r] limit - # @return [Integer] Amount of allwoed concurrent job processors - attr_reader :limit - # @param [#to_s] strategy_key # @param [Hash] opts - # @option opts [#to_i] :limit Amount of allwoed concurrent jobs + # @option opts [#to_i] :limit Amount of allowed concurrent jobs # processors running for given key # @option opts [#to_i] :ttl (15 minutes) Concurrency lock TTL # in seconds # @option opts :key_suffix Proc for dynamic key suffix. def initialize(strategy_key, opts) - @base_key = "#{strategy_key}:concurrency".freeze - @limit = opts.fetch(:limit).to_i - @ttl = opts.fetch(:ttl, 900).to_i + @base_key = "#{strategy_key}:concurrency".freeze + @limit = opts.fetch(:limit) + @ttl = opts.fetch(:ttl, 900).to_i @key_suffix = opts[:key_suffix] end - def dynamic_keys? - @key_suffix + # @return [Integer] Amount of allowed concurrent job processors + def limit(job_args = nil) + return @limit.to_i unless @limit.respond_to? :call + @limit.call(*job_args).to_i end + # @return [Boolean] Whenever strategy has dynamic config + def dynamic? + @key_suffix || @limit.respond_to?(:call) + end + # @return [Boolean] whenever job is throttled or not def throttled?(jid, *job_args) - 1 == SCRIPT.eval([key(job_args)], [@limit, @ttl, jid.to_s]) + 1 == SCRIPT.eval([key(job_args)], [limit(job_args), @ttl, jid.to_s]) end # @return [Integer] Current count of jobs def count(*job_args) Sidekiq.redis { |conn| conn.scard(key(job_args)) }.to_i