lib/sidekiq/throttled/strategy.rb in sidekiq-throttled-0.10.0 vs lib/sidekiq/throttled/strategy.rb in sidekiq-throttled-0.11.0

- old
+ new

@@ -17,31 +17,29 @@ # @!attribute [r] threshold # @return [Strategy::Threshold, nil] attr_reader :threshold + # @!attribute [r] observer + # @return [Proc, nil] + attr_reader :observer + # @param [#to_s] name # @param [Hash] concurrency Concurrency options. # See keyword args of {Strategy::Concurrency#initialize} for details. # @param [Hash] threshold Threshold options. # See keyword args of {Strategy::Threshold#initialize} for details. # @param [#call] key_suffix Dynamic key suffix generator. - def initialize(name, concurrency: nil, threshold: nil, key_suffix: nil) - key = "throttled:#{name}" + # @param [#call] observer Process called after throttled. + def initialize( + name, + concurrency: nil, threshold: nil, key_suffix: nil, observer: nil + ) + @observer = observer + @concurrency = make_strategy(Concurrency, name, key_suffix, concurrency) + @threshold = make_strategy(Threshold, name, key_suffix, threshold) - @concurrency = - if concurrency - concurrency[:key_suffix] ||= key_suffix - Concurrency.new(key, **concurrency) - end - - @threshold = - if threshold - threshold[:key_suffix] ||= key_suffix - Threshold.new(key, **threshold) - end - return if @concurrency || @threshold raise ArgumentError, "Neither :concurrency nor :threshold given" end @@ -53,13 +51,17 @@ false end # @return [Boolean] whenever job is throttled or not. def throttled?(jid, *job_args) - return true if @concurrency&.throttled?(jid, *job_args) + if @concurrency&.throttled?(jid, *job_args) + @observer&.call(:concurrency, *job_args) + return true + end if @threshold&.throttled?(*job_args) + @observer&.call(:threshold, *job_args) finalize!(jid, *job_args) return true end false @@ -74,9 +76,21 @@ # Resets count of jobs of all avaliable strategies # @return [void] def reset! @concurrency&.reset! @threshold&.reset! + end + + private + + # @return [Base, nil] + def make_strategy(strategy, name, key_suffix, options) + return unless options + + strategy.new("throttled:#{name}", { + :key_suffix => key_suffix, + **options + }) end end end end