Sha256: f23692818e045b6d8b1b8849a449eca45e893e88c428d9116e4f6896d5cde5e2
Contents?: true
Size: 1.48 KB
Versions: 1
Compression:
Stored size: 1.48 KB
Contents
module NulogyMessageBusConsumer module KafkaUtils module_function def wait_for_assignment(consumer) wait_for { !consumer.assignment.empty? } end def wait_for_unassignment(consumer) wait_for { consumer.assignment.empty? } end def wait_for(attempts: 100, interval: 0.1) attempts.times do break if yield sleep interval end end def every_message_until_none_are_left(consumer) Enumerator.new do |yielder| while message = consumer.poll(250) yielder.yield(message) end end end def seek_beginning(consumer) wait_for_assignment(consumer) assigned_partitions(consumer).each do |topic_name, partition| message = Message.new( topic: topic_name, partition: partition, offset: BEGINNING_OFFSET ) consumer.seek(message) end end def seek_ending(consumer) wait_for_assignment(consumer) assigned_partitions(consumer).each do |topic_name, partition| message = Message.new( topic: topic_name, partition: partition, offset: END_OFFSET ) consumer.seek(message) end end def assigned_partitions(consumer) consumer.assignment.to_h .flat_map { |topic_name, partitions| [topic_name].product(partitions) } .map { |topic_name, partition| [topic_name, partition.partition] } end BEGINNING_OFFSET = 0 END_OFFSET = -1 end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
nulogy_message_bus_consumer-0.3.0 | lib/nulogy_message_bus_consumer/kafka_utils.rb |