Sha256: f23692818e045b6d8b1b8849a449eca45e893e88c428d9116e4f6896d5cde5e2

Contents?: true

Size: 1.48 KB

Versions: 1

Compression:

Stored size: 1.48 KB

Contents

module NulogyMessageBusConsumer
  module KafkaUtils
    module_function

    def wait_for_assignment(consumer)
      wait_for { !consumer.assignment.empty? }
    end

    def wait_for_unassignment(consumer)
      wait_for { consumer.assignment.empty? }
    end

    def wait_for(attempts: 100, interval: 0.1)
      attempts.times do
        break if yield
        sleep interval
      end
    end

    def every_message_until_none_are_left(consumer)
      Enumerator.new do |yielder|
        while message = consumer.poll(250)
          yielder.yield(message)
        end
      end
    end

    def seek_beginning(consumer)
      wait_for_assignment(consumer)
      assigned_partitions(consumer).each do |topic_name, partition|
        message = Message.new(
          topic: topic_name,
          partition: partition,
          offset: BEGINNING_OFFSET
        )
        consumer.seek(message)
      end
    end

    def seek_ending(consumer)
      wait_for_assignment(consumer)
      assigned_partitions(consumer).each do |topic_name, partition|
        message = Message.new(
          topic: topic_name,
          partition: partition,
          offset: END_OFFSET
        )
        consumer.seek(message)
      end
    end

    def assigned_partitions(consumer)
      consumer.assignment.to_h
        .flat_map { |topic_name, partitions| [topic_name].product(partitions) }
        .map { |topic_name, partition| [topic_name, partition.partition] }
    end

    BEGINNING_OFFSET = 0
    END_OFFSET = -1
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
nulogy_message_bus_consumer-0.3.0 lib/nulogy_message_bus_consumer/kafka_utils.rb