Sha256: 83b39e407abf6a15cb8743470ba76ef446343a6d8d4950317850e105de6d35f0
Contents?: true
Size: 568 Bytes
Versions: 6
Compression:
Stored size: 568 Bytes
Contents
module NulogyMessageBusConsumer module Steps class StreamMessages def initialize(logger) @logger = logger end def call(kafka_consumer:, **_) kafka_consumer.each do |kafka_message| yield( message: Message.from_kafka(kafka_message), kafka_message: kafka_message ) end rescue => e @logger.error(JSON.dump({ event: "message_processing_errored", class: e.class, message: e.message })) raise end end end end
Version data entries
6 entries across 6 versions & 1 rubygems