Sha256: 4bc094e7117d140d6b43bf97df59a0599fe5389ffcff76166f6046b334f539f1

Contents?: true

Size: 1.23 KB

Versions: 11

Compression:

Stored size: 1.23 KB

Contents

# frozen_string_literal: true

module Kafka
  class FetchedOffsetResolver
    def initialize(logger:)
      @logger = logger
    end

    def resolve!(broker, topics)
      pending_topics = filter_pending_topics(topics)
      return topics if pending_topics.empty?

      response = broker.list_offsets(topics: pending_topics)

      pending_topics.each do |topic, partitions|
        partitions.each do |options|
          partition = options.fetch(:partition)
          resolved_offset = response.offset_for(topic, partition)

          @logger.debug "Offset for #{topic}/#{partition} is #{resolved_offset.inspect}"

          topics[topic][partition][:fetch_offset] = resolved_offset || 0
        end
      end
    end

    private

    def filter_pending_topics(topics)
      pending_topics = {}
      topics.each do |topic, partitions|
        partitions.each do |partition, options|
          offset = options.fetch(:fetch_offset)
          next if offset >= 0

          @logger.debug "Resolving offset `#{offset}` for #{topic}/#{partition}..."

          pending_topics[topic] ||= []
          pending_topics[topic] << {
            partition: partition,
            time: offset
          }
        end
      end
      pending_topics
    end
  end
end

Version data entries

11 entries across 11 versions & 1 rubygems

Version Path
ruby-kafka-0.7.5 lib/kafka/fetched_offset_resolver.rb
ruby-kafka-0.7.5.beta1 lib/kafka/fetched_offset_resolver.rb
ruby-kafka-0.7.4 lib/kafka/fetched_offset_resolver.rb
ruby-kafka-0.7.3 lib/kafka/fetched_offset_resolver.rb
ruby-kafka-0.7.2 lib/kafka/fetched_offset_resolver.rb
ruby-kafka-0.7.1.debugcorruption2 lib/kafka/fetched_offset_resolver.rb
ruby-kafka-0.7.1.messagecorruptiondebug lib/kafka/fetched_offset_resolver.rb
ruby-kafka-0.7.1 lib/kafka/fetched_offset_resolver.rb
ruby-kafka-0.7.1.beta2 lib/kafka/fetched_offset_resolver.rb
ruby-kafka-0.7.1.possible.pre.fetch.pre.fix lib/kafka/fetched_offset_resolver.rb
ruby-kafka-0.7.1.beta1 lib/kafka/fetched_offset_resolver.rb