Sha256: a869426cf62f3a989af396ec5d1857d35104c0b096a42b1ecf2a46e0051c634d
Contents?: true
Size: 854 Bytes
Versions: 2
Compression:
Stored size: 854 Bytes
Contents
module NulogyMessageBusConsumer module Steps class StreamMessagesUntilNoneAreLeft def initialize(logger, timeout = 250) @logger = logger @timeout = timeout end def call(kafka_consumer:, **_) KafkaUtils.every_message_until_none_are_left(kafka_consumer, @timeout).each do |kafka_message| result = yield( message: Message.from_kafka(kafka_message), kafka_message: kafka_message ) if result == :failure # stop reading on failure or else we'll get stuck in a loop return :failure end end :success rescue => e @logger.error(JSON.dump({ event: "message_processing_errored", class: e.class, message: e.message })) raise end end end end
Version data entries
2 entries across 2 versions & 1 rubygems