Sha256: 11bce0cd67ea783e1eaca055e44933f92d1e279b397974e2707dc60371b3b000

Contents?: true

Size: 1.39 KB

Versions: 61

Compression:

Stored size: 1.39 KB

Contents

# frozen_string_literal: true

module Karafka
  module Connection
    # Partitions pauses management abstraction layer.
    # It aggregates all the pauses for all the partitions that we're working with.
    class PausesManager
      # @return [Karafka::Connection::PausesManager] pauses manager
      def initialize
        @pauses = Hash.new do |h, k|
          h[k] = {}
        end
      end

      # Creates or fetches pause tracker of a given topic partition.
      #
      # @param topic [String] topic name
      # @param partition [Integer] partition number
      # @return [Karafka::TimeTrackers::Pause] pause tracker instance
      def fetch(topic, partition)
        @pauses[topic][partition] ||= TimeTrackers::Pause.new(
          timeout: Karafka::App.config.pause_timeout,
          max_timeout: Karafka::App.config.pause_max_timeout,
          exponential_backoff: Karafka::App.config.pause_with_exponential_backoff
        )
      end

      # Resumes processing of partitions for which pause time has ended.
      #
      # @yieldparam [String] topic name
      # @yieldparam [Integer] partition number
      def resume
        @pauses.each do |topic, partitions|
          partitions.each do |partition, pause|
            next unless pause.paused?
            next unless pause.expired?

            pause.resume

            yield(topic, partition)
          end
        end
      end
    end
  end
end

Version data entries

61 entries across 61 versions & 1 rubygems

Version Path
karafka-2.1.8 lib/karafka/connection/pauses_manager.rb
karafka-2.1.7 lib/karafka/connection/pauses_manager.rb
karafka-2.1.6 lib/karafka/connection/pauses_manager.rb
karafka-2.1.5 lib/karafka/connection/pauses_manager.rb
karafka-2.1.5.beta1 lib/karafka/connection/pauses_manager.rb
karafka-2.1.4 lib/karafka/connection/pauses_manager.rb
karafka-2.1.3 lib/karafka/connection/pauses_manager.rb
karafka-2.1.2 lib/karafka/connection/pauses_manager.rb
karafka-2.1.1 lib/karafka/connection/pauses_manager.rb
karafka-2.1.0 lib/karafka/connection/pauses_manager.rb
karafka-2.0.41 lib/karafka/connection/pauses_manager.rb
karafka-2.0.40 lib/karafka/connection/pauses_manager.rb
karafka-2.0.39 lib/karafka/connection/pauses_manager.rb
karafka-2.0.38 lib/karafka/connection/pauses_manager.rb
karafka-2.0.37 lib/karafka/connection/pauses_manager.rb
karafka-2.0.36 lib/karafka/connection/pauses_manager.rb
karafka-2.0.35 lib/karafka/connection/pauses_manager.rb
karafka-2.0.34 lib/karafka/connection/pauses_manager.rb
karafka-2.0.33 lib/karafka/connection/pauses_manager.rb
karafka-2.0.32 lib/karafka/connection/pauses_manager.rb