Sha256: 3878f8b063a7c6c0eba98076ae74dd1c497a0d21c8fec99650d9bf3b226e32ab
Contents?: true
Size: 1.37 KB
Versions: 6
Compression:
Stored size: 1.37 KB
Contents
# frozen_string_literal: true module Karafka module Connection # Partitions pauses management abstraction layer. # It aggregates all the pauses for all the partitions that we're working with. class PausesManager # @return [Karafka::Connection::PausesManager] pauses manager def initialize @pauses = Hash.new do |h, k| h[k] = {} end end # Creates or fetches pause of a given topic partition. # # @param topic [String] topic name # @param partition [Integer] partition number # @return [Karafka::TimeTrackers::Pause] pause instance def fetch(topic, partition) @pauses[topic][partition] ||= TimeTrackers::Pause.new( timeout: Karafka::App.config.pause_timeout, max_timeout: Karafka::App.config.pause_max_timeout, exponential_backoff: Karafka::App.config.pause_with_exponential_backoff ) end # Resumes processing of partitions for which pause time has ended. # # @yieldparam [String] topic name # @yieldparam [Integer] partition number def resume @pauses.each do |topic, partitions| partitions.each do |partition, pause| next unless pause.paused? next unless pause.expired? pause.resume yield(topic, partition) end end end end end end
Version data entries
6 entries across 6 versions & 1 rubygems