Sha256: 48308ac4d96b120532a411b526c87b06a2f74902f64f33d129aa9fca27743965

Contents?: true

Size: 1.37 KB

Versions: 48

Compression:

Stored size: 1.37 KB

Contents

# frozen_string_literal: true

module Karafka
  module Connection
    # Partitions pauses management abstraction layer.
    # It aggregates all the pauses for all the partitions that we're working with.
    class PausesManager
      # @return [Karafka::Connection::PausesManager] pauses manager
      def initialize
        @pauses = Hash.new do |h, k|
          h[k] = {}
        end
      end

      # Creates or fetches pause tracker of a given topic partition.
      #
      # @param topic [::Karafka::Routing::Topic] topic
      # @param partition [Integer] partition number
      # @return [Karafka::TimeTrackers::Pause] pause tracker instance
      def fetch(topic, partition)
        @pauses[topic][partition] ||= TimeTrackers::Pause.new(
          timeout: topic.pause_timeout,
          max_timeout: topic.pause_max_timeout,
          exponential_backoff: topic.pause_with_exponential_backoff
        )
      end

      # Resumes processing of partitions for which pause time has ended.
      #
      # @yieldparam [Karafka::Routing::Topic] topic
      # @yieldparam [Integer] partition number
      def resume
        @pauses.each do |topic, partitions|
          partitions.each do |partition, pause|
            next unless pause.paused?
            next unless pause.expired?

            pause.resume

            yield(topic, partition)
          end
        end
      end
    end
  end
end

Version data entries

48 entries across 48 versions & 1 rubygems

Version Path
karafka-2.4.17 lib/karafka/connection/pauses_manager.rb
karafka-2.4.16 lib/karafka/connection/pauses_manager.rb
karafka-2.4.15 lib/karafka/connection/pauses_manager.rb
karafka-2.4.14 lib/karafka/connection/pauses_manager.rb
karafka-2.4.13 lib/karafka/connection/pauses_manager.rb
karafka-2.4.12 lib/karafka/connection/pauses_manager.rb
karafka-2.4.11 lib/karafka/connection/pauses_manager.rb
karafka-2.4.10 lib/karafka/connection/pauses_manager.rb
karafka-2.4.9 lib/karafka/connection/pauses_manager.rb
karafka-2.4.8 lib/karafka/connection/pauses_manager.rb
karafka-2.4.7 lib/karafka/connection/pauses_manager.rb
karafka-2.4.6 lib/karafka/connection/pauses_manager.rb
karafka-2.4.5 lib/karafka/connection/pauses_manager.rb
karafka-2.4.4 lib/karafka/connection/pauses_manager.rb
karafka-2.4.3 lib/karafka/connection/pauses_manager.rb
karafka-2.4.0 lib/karafka/connection/pauses_manager.rb
karafka-2.4.0.rc1 lib/karafka/connection/pauses_manager.rb
karafka-2.3.4 lib/karafka/connection/pauses_manager.rb
karafka-2.4.0.beta2 lib/karafka/connection/pauses_manager.rb
karafka-2.4.0.beta1 lib/karafka/connection/pauses_manager.rb