lib/sidekiq/throttled/strategy.rb in sidekiq-throttled-0.10.0 vs lib/sidekiq/throttled/strategy.rb in sidekiq-throttled-0.11.0
- old
+ new
@@ -17,31 +17,29 @@
# @!attribute [r] threshold
# @return [Strategy::Threshold, nil]
attr_reader :threshold
+ # @!attribute [r] observer
+ # @return [Proc, nil]
+ attr_reader :observer
+
# @param [#to_s] name
# @param [Hash] concurrency Concurrency options.
# See keyword args of {Strategy::Concurrency#initialize} for details.
# @param [Hash] threshold Threshold options.
# See keyword args of {Strategy::Threshold#initialize} for details.
# @param [#call] key_suffix Dynamic key suffix generator.
- def initialize(name, concurrency: nil, threshold: nil, key_suffix: nil)
- key = "throttled:#{name}"
+ # @param [#call] observer Process called after throttled.
+ def initialize(
+ name,
+ concurrency: nil, threshold: nil, key_suffix: nil, observer: nil
+ )
+ @observer = observer
+ @concurrency = make_strategy(Concurrency, name, key_suffix, concurrency)
+ @threshold = make_strategy(Threshold, name, key_suffix, threshold)
- @concurrency =
- if concurrency
- concurrency[:key_suffix] ||= key_suffix
- Concurrency.new(key, **concurrency)
- end
-
- @threshold =
- if threshold
- threshold[:key_suffix] ||= key_suffix
- Threshold.new(key, **threshold)
- end
-
return if @concurrency || @threshold
raise ArgumentError, "Neither :concurrency nor :threshold given"
end
@@ -53,13 +51,17 @@
false
end
# @return [Boolean] whenever job is throttled or not.
def throttled?(jid, *job_args)
- return true if @concurrency&.throttled?(jid, *job_args)
+ if @concurrency&.throttled?(jid, *job_args)
+ @observer&.call(:concurrency, *job_args)
+ return true
+ end
if @threshold&.throttled?(*job_args)
+ @observer&.call(:threshold, *job_args)
finalize!(jid, *job_args)
return true
end
false
@@ -74,9 +76,21 @@
# Resets count of jobs of all avaliable strategies
# @return [void]
def reset!
@concurrency&.reset!
@threshold&.reset!
+ end
+
+ private
+
+ # @return [Base, nil]
+ def make_strategy(strategy, name, key_suffix, options)
+ return unless options
+
+ strategy.new("throttled:#{name}", {
+ :key_suffix => key_suffix,
+ **options
+ })
end
end
end
end