Sha256: 39031756c9b68d7eac0a247e9d80454b00996097c5b454f7e65a2aa00819d7ff
Contents?: true
Size: 1.4 KB
Versions: 9
Compression:
Stored size: 1.4 KB
Contents
# frozen_string_literal: true require "concurrent" require_relative "./expirable_set" module Sidekiq module Throttled # @api internal # # Queues cooldown manager. Tracks list of queues that should be temporarily # (for the duration of {Config#cooldown_period}) excluded from polling. class Cooldown class << self # Returns new {Cooldown} instance if {Config#cooldown_period} is not `nil`. # # @param config [Config] # @return [Cooldown, nil] def [](config) new(config) if config.cooldown_period end end # @param config [Config] def initialize(config) @queues = ExpirableSet.new(config.cooldown_period) @threshold = config.cooldown_threshold @tracker = Concurrent::Map.new end # Notify that given queue returned job that was throttled. # # @param queue [String] # @return [void] def notify_throttled(queue) @queues.add(queue) if @threshold <= @tracker.merge_pair(queue, 1, &:succ) end # Notify that given queue returned job that was not throttled. # # @param queue [String] # @return [void] def notify_admitted(queue) @tracker.delete(queue) end # List of queues that should not be polled # # @return [Array<String>] def queues @queues.to_a end end end end
Version data entries
9 entries across 9 versions & 1 rubygems