Sha256: 4c00daf5707958a1556126ad8d10a6606cb879a42e12e7be6cffed78b2da24d1
Contents?: true
Size: 645 Bytes
Versions: 5
Compression:
Stored size: 645 Bytes
Contents
module NulogyMessageBusConsumer module Steps class StreamMessagesUntilNoneAreLeft def initialize(logger) @logger = logger end def call(kafka_consumer:, **_) KafkaUtils.every_message_until_none_are_left(kafka_consumer).each do |kafka_message| yield( message: Message.from_kafka(kafka_message), kafka_message: kafka_message ) end rescue StandardError => e @logger.error(JSON.dump({ event: "message_processing_errored", class: e.class, message: e.message, })) raise end end end end
Version data entries
5 entries across 5 versions & 1 rubygems