Sha256: 61fdcd04d76e4a00f5577e3e226091c2ecea97d11edfe28a7ff1ac0cb002d718

Contents?: true

Size: 1.25 KB

Versions: 22

Compression:

Stored size: 1.25 KB

Contents

# frozen_string_literal: true

module Kafka
  class FetchedOffsetResolver
    def initialize(logger:)
      @logger = TaggedLogger.new(logger)
    end

    def resolve!(broker, topics)
      pending_topics = filter_pending_topics(topics)
      return topics if pending_topics.empty?

      response = broker.list_offsets(topics: pending_topics)

      pending_topics.each do |topic, partitions|
        partitions.each do |options|
          partition = options.fetch(:partition)
          resolved_offset = response.offset_for(topic, partition)

          @logger.debug "Offset for #{topic}/#{partition} is #{resolved_offset.inspect}"

          topics[topic][partition][:fetch_offset] = resolved_offset || 0
        end
      end
    end

    private

    def filter_pending_topics(topics)
      pending_topics = {}
      topics.each do |topic, partitions|
        partitions.each do |partition, options|
          offset = options.fetch(:fetch_offset)
          next if offset >= 0

          @logger.debug "Resolving offset `#{offset}` for #{topic}/#{partition}..."

          pending_topics[topic] ||= []
          pending_topics[topic] << {
            partition: partition,
            time: offset
          }
        end
      end
      pending_topics
    end
  end
end

Version data entries

22 entries across 22 versions & 4 rubygems

Version Path
ruby-kafka-0.7.6.beta2 lib/kafka/fetched_offset_resolver.rb
ruby-kafka-0.7.6.beta1 lib/kafka/fetched_offset_resolver.rb