lib/sidekiq/throttled/strategy.rb in sidekiq-throttled-0.1.0 vs lib/sidekiq/throttled/strategy.rb in sidekiq-throttled-0.2.0
- old
+ new
@@ -1,5 +1,6 @@
+# frozen_string_literal: true
# internal
require "sidekiq/throttled/errors"
require "sidekiq/throttled/strategy/concurrency"
require "sidekiq/throttled/strategy/threshold"
@@ -13,41 +14,55 @@
# @!attribute [r] threshold
# @return [Strategy::Threshold, nil]
attr_reader :threshold
- # @param [#to_s] key
+ # @param [#to_s] name
# @param [Hash] concurrency Concurrency options.
# See {Strategy::Concurrency#initialize} for details.
# @param [Hash] threshold Threshold options.
# See {Strategy::Threshold#initialize} for details.
- def initialize(key, concurrency: nil, threshold: nil)
- base_key = "throttled:#{key}"
+ # @param [Hash] key_suffix Proc for dynamic keys.
+ def initialize(name, concurrency: nil, threshold: nil, key_suffix: nil)
+ key = "throttled:#{name}"
- @concurrency = concurrency && Concurrency.new(base_key, concurrency)
- @threshold = threshold && Threshold.new(base_key, threshold)
+ if concurrency
+ @concurrency = Concurrency.new(
+ key, concurrency.merge(:key_suffix => key_suffix)
+ )
+ end
+ if threshold
+ @threshold = Threshold.new(
+ key, threshold.merge(:key_suffix => key_suffix)
+ )
+ end
return if @concurrency || @threshold
- fail ArgumentError, "Neither :concurrency nor :threshold given"
+ raise ArgumentError, "Neither :concurrency nor :threshold given"
end
+ def dynamic_keys?
+ (@concurrency && @concurrency.dynamic_keys?) ||
+ (@threshold && @threshold.dynamic_keys?)
+ end
+
# @return [Boolean] whenever job is throttled or not.
- def throttled?(jid)
- return true if @concurrency && @concurrency.throttled?(jid)
+ def throttled?(jid, *job_args)
+ return true if @concurrency && @concurrency.throttled?(jid, *job_args)
- if @threshold && @threshold.throttled?
- finalize! jid
+ if @threshold && @threshold.throttled?(*job_args)
+ finalize!(jid, *job_args)
return true
end
false
end
# Marks job as being processed.
# @return [void]
- def finalize!(jid)
- @concurrency && @concurrency.finalize!(jid)
+ def finalize!(jid, *job_args)
+ @concurrency && @concurrency.finalize!(jid, *job_args)
end
# Resets count of jobs of all avaliable strategies
# @return [void]
def reset!