Sha256: 95d5afa62a0d3c46df4a0733b2ac0983b86e193bb624eb4b33cb297d7400c8d8
Contents?: true
Size: 1.49 KB
Versions: 8
Compression:
Stored size: 1.49 KB
Contents
module NulogyMessageBusConsumer module KafkaUtils module_function def wait_for_assignment(consumer) wait_for { !consumer.assignment.empty? } end def wait_for_unassignment(consumer) wait_for { consumer.assignment.empty? } end def wait_for(attempts: 100, interval: 0.1) attempts.times do break if yield sleep interval end end def every_message_until_none_are_left(consumer) Enumerator.new do |yielder| while (message = consumer.poll(250)) yielder.yield(message) end end end def seek_beginning(consumer) wait_for_assignment(consumer) assigned_partitions(consumer).each do |topic_name, partition| message = Message.new( topic: topic_name, partition: partition, offset: BEGINNING_OFFSET ) consumer.seek(message) end end def seek_ending(consumer) wait_for_assignment(consumer) assigned_partitions(consumer).each do |topic_name, partition| message = Message.new( topic: topic_name, partition: partition, offset: END_OFFSET ) consumer.seek(message) end end def assigned_partitions(consumer) consumer.assignment.to_h .flat_map { |topic_name, partitions| [topic_name].product(partitions) } .map { |topic_name, partition| [topic_name, partition.partition] } end BEGINNING_OFFSET = 0 END_OFFSET = -1 end end
Version data entries
8 entries across 8 versions & 1 rubygems