Sha256: 3878f8b063a7c6c0eba98076ae74dd1c497a0d21c8fec99650d9bf3b226e32ab

Contents?: true

Size: 1.37 KB

Versions: 6

Compression:

Stored size: 1.37 KB

Contents

# frozen_string_literal: true

module Karafka
  module Connection
    # Partitions pauses management abstraction layer.
    # It aggregates all the pauses for all the partitions that we're working with.
    class PausesManager
      # @return [Karafka::Connection::PausesManager] pauses manager
      def initialize
        @pauses = Hash.new do |h, k|
          h[k] = {}
        end
      end

      # Creates or fetches pause of a given topic partition.
      #
      # @param topic [String] topic name
      # @param partition [Integer] partition number
      # @return [Karafka::TimeTrackers::Pause] pause instance
      def fetch(topic, partition)
        @pauses[topic][partition] ||= TimeTrackers::Pause.new(
          timeout: Karafka::App.config.pause_timeout,
          max_timeout: Karafka::App.config.pause_max_timeout,
          exponential_backoff: Karafka::App.config.pause_with_exponential_backoff
        )
      end

      # Resumes processing of partitions for which pause time has ended.
      #
      # @yieldparam [String] topic name
      # @yieldparam [Integer] partition number
      def resume
        @pauses.each do |topic, partitions|
          partitions.each do |partition, pause|
            next unless pause.paused?
            next unless pause.expired?

            pause.resume

            yield(topic, partition)
          end
        end
      end
    end
  end
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
karafka-2.0.0.alpha6 lib/karafka/connection/pauses_manager.rb
karafka-2.0.0.alpha5 lib/karafka/connection/pauses_manager.rb
karafka-2.0.0.alpha4 lib/karafka/connection/pauses_manager.rb
karafka-2.0.0.alpha3 lib/karafka/connection/pauses_manager.rb
karafka-2.0.0.alpha2 lib/karafka/connection/pauses_manager.rb
karafka-2.0.0.alpha1 lib/karafka/connection/pauses_manager.rb