module NulogyMessageBusConsumer module KafkaUtils module_function def wait_for_assignment(consumer) wait_for { !consumer.assignment.empty? } end def wait_for_unassignment(consumer) wait_for { consumer.assignment.empty? } end def wait_for(attempts: 100, interval: 0.1) attempts.times do break if yield sleep interval end end def every_message_until_none_are_left(consumer) Enumerator.new do |yielder| while message = consumer.poll(250) yielder.yield(message) end end end def seek_beginning(consumer) wait_for_assignment(consumer) assigned_partitions(consumer).each do |topic_name, partition| message = Message.new( topic: topic_name, partition: partition, offset: BEGINNING_OFFSET ) consumer.seek(message) end end def seek_ending(consumer) wait_for_assignment(consumer) assigned_partitions(consumer).each do |topic_name, partition| message = Message.new( topic: topic_name, partition: partition, offset: END_OFFSET ) consumer.seek(message) end end def assigned_partitions(consumer) consumer.assignment.to_h .flat_map { |topic_name, partitions| [topic_name].product(partitions) } .map { |topic_name, partition| [topic_name, partition.partition] } end BEGINNING_OFFSET = 0 END_OFFSET = -1 end end