Sha256: 2a98ebfa5a442abdc0f58f59a08b9245c78071a8514fed6a47fbda9933ac01a6

Contents?: true

Size: 1.87 KB

Versions: 6

Compression:

Stored size: 1.87 KB

Contents

module Kafka
  class OffsetManager
    def initialize(group:, logger:, commit_interval:, commit_threshold:)
      @group = group
      @logger = logger
      @commit_interval = commit_interval
      @commit_threshold = commit_threshold

      @uncommitted_offsets = 0
      @processed_offsets = {}
      @default_offsets = {}
      @committed_offsets = nil
      @last_commit = Time.at(0)
    end

    def set_default_offset(topic, default_offset)
      @default_offsets[topic] = default_offset
    end

    def mark_as_processed(topic, partition, offset)
      @uncommitted_offsets += 1
      @processed_offsets[topic] ||= {}
      @processed_offsets[topic][partition] = offset + 1
    end

    def next_offset_for(topic, partition)
      offset = @processed_offsets.fetch(topic, {}).fetch(partition) {
        committed_offset_for(topic, partition)
      }

      offset = @default_offsets.fetch(topic) if offset < 0

      offset
    end

    def commit_offsets
      unless @processed_offsets.empty?
        @logger.info "Committing offsets for #{@uncommitted_offsets} messages"

        @group.commit_offsets(@processed_offsets)

        @last_commit = Time.now
        @processed_offsets.clear
        @uncommitted_offsets = 0
      end
    end

    def commit_offsets_if_necessary
      if seconds_since_last_commit >= @commit_interval || commit_threshold_reached?
        commit_offsets
      end
    end

    def clear_offsets
      @uncommitted_offsets = 0
      @processed_offsets.clear
      @committed_offsets = nil
    end

    private

    def seconds_since_last_commit
      Time.now - @last_commit
    end

    def committed_offset_for(topic, partition)
      @committed_offsets ||= @group.fetch_offsets
      @committed_offsets.offset_for(topic, partition)
    end

    def commit_threshold_reached?
      @commit_threshold != 0 && @uncommitted_offsets >= @commit_threshold
    end
  end
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
ruby-kafka-0.3.5 lib/kafka/offset_manager.rb
ruby-kafka-0.3.4 lib/kafka/offset_manager.rb
ruby-kafka-0.3.3 lib/kafka/offset_manager.rb
ruby-kafka-0.3.2 lib/kafka/offset_manager.rb
ruby-kafka-0.3.1 lib/kafka/offset_manager.rb
ruby-kafka-0.3.0 lib/kafka/offset_manager.rb