# frozen_string_literal: true # This Karafka component is a Pro component under a commercial license. # This Karafka component is NOT licensed under LGPL. # # All of the commercial components are present in the lib/karafka/pro directory of this # repository and their usage requires commercial license agreement. # # Karafka has also commercial-friendly license, commercial support and commercial components. # # By sending a pull request to the pro components, you are agreeing to transfer the copyright of # your code to Maciej Mensfeld. module Karafka module Pro class Iterator # There are various ways you can provide topics information for iterating. # # This mapper normalizes this data, resolves offsets and maps the time based offsets into # appropriate once # # Following formats are accepted: # # - 'topic1' - just a string with one topic name # - ['topic1', 'topic2'] - just the names # - { 'topic1' => -100 } - names with negative lookup offset # - { 'topic1' => { 0 => 5 } } - names with exact partitions offsets # - { 'topic1' => { 0 => -5 }, 'topic2' => { 1 => 5 } } - with per partition negative offsets # - { 'topic1' => 100 } - means we run all partitions from the offset 100 # - { 'topic1' => Time.now - 60 } - we run all partitions from the message from 60s ago # - { 'topic1' => { 1 => Time.now - 60 } } - partition1 from message 60s ago # class Expander # Expands topics to which we want to subscribe with partitions information in case this # info is not provided. # # @param topics [Array, Hash, String] topics definitions # @return [Hash] expanded and normalized requested topics and partitions data def call(topics) expanded = Hash.new { |h, k| h[k] = {} } normalize_format(topics).map do |topic, details| if details.is_a?(Hash) details.each do |partition, offset| expanded[topic][partition] = offset end else partition_count(topic).times do |partition| # If no offsets are provided, we just start from zero expanded[topic][partition] = details || 0 end end end expanded end private # Input can be provided in multiple formats. Here we normalize it to one (hash). # # @param topics [Array, Hash, String] requested topics # @return [Hash] normalized hash with topics data def normalize_format(topics) # Simplification for the single topic case topics = [topics] if topics.is_a?(String) # If we've got just array with topics, we need to convert that into a representation # that we can expand with offsets topics = topics.map { |name| [name, false] }.to_h if topics.is_a?(Array) # We remap by creating new hash, just in case the hash came as the argument for this # expanded. We do not want to modify user provided hash topics.transform_keys(&:to_s) end # List of topics with their partition information for expansion # We cache it so we do not have to run consecutive requests to obtain data about multiple # topics def topics @topics ||= Admin.cluster_info.topics end # @param name [String] topic name # @return [Integer] number of partitions of the topic we want to iterate over def partition_count(name) topics .find { |topic| topic.fetch(:topic_name) == name } .tap { |topic| topic || raise(Errors::TopicNotFoundError, name) } .fetch(:partitions) .count end end end end end