Sha256: 95d5afa62a0d3c46df4a0733b2ac0983b86e193bb624eb4b33cb297d7400c8d8

Contents?: true

Size: 1.49 KB

Versions: 8

Compression:

Stored size: 1.49 KB

Contents

module NulogyMessageBusConsumer
  module KafkaUtils
    module_function

    def wait_for_assignment(consumer)
      wait_for { !consumer.assignment.empty? }
    end

    def wait_for_unassignment(consumer)
      wait_for { consumer.assignment.empty? }
    end

    def wait_for(attempts: 100, interval: 0.1)
      attempts.times do
        break if yield

        sleep interval
      end
    end

    def every_message_until_none_are_left(consumer)
      Enumerator.new do |yielder|
        while (message = consumer.poll(250))
          yielder.yield(message)
        end
      end
    end

    def seek_beginning(consumer)
      wait_for_assignment(consumer)
      assigned_partitions(consumer).each do |topic_name, partition|
        message = Message.new(
          topic: topic_name,
          partition: partition,
          offset: BEGINNING_OFFSET
        )
        consumer.seek(message)
      end
    end

    def seek_ending(consumer)
      wait_for_assignment(consumer)
      assigned_partitions(consumer).each do |topic_name, partition|
        message = Message.new(
          topic: topic_name,
          partition: partition,
          offset: END_OFFSET
        )
        consumer.seek(message)
      end
    end

    def assigned_partitions(consumer)
      consumer.assignment.to_h
        .flat_map { |topic_name, partitions| [topic_name].product(partitions) }
        .map { |topic_name, partition| [topic_name, partition.partition] }
    end

    BEGINNING_OFFSET = 0
    END_OFFSET = -1
  end
end

Version data entries

8 entries across 8 versions & 1 rubygems

Version Path
nulogy_message_bus_consumer-2.0.0 lib/nulogy_message_bus_consumer/kafka_utils.rb
nulogy_message_bus_consumer-1.0.0 lib/nulogy_message_bus_consumer/kafka_utils.rb
nulogy_message_bus_consumer-0.5.0 lib/nulogy_message_bus_consumer/kafka_utils.rb
nulogy_message_bus_consumer-1.0.0.alpha lib/nulogy_message_bus_consumer/kafka_utils.rb
nulogy_message_bus_consumer-0.4.0 lib/nulogy_message_bus_consumer/kafka_utils.rb
nulogy_message_bus_consumer-0.3.3 lib/nulogy_message_bus_consumer/kafka_utils.rb
nulogy_message_bus_consumer-0.3.2 lib/nulogy_message_bus_consumer/kafka_utils.rb
nulogy_message_bus_consumer-0.3.1 lib/nulogy_message_bus_consumer/kafka_utils.rb