Sha256: 83b39e407abf6a15cb8743470ba76ef446343a6d8d4950317850e105de6d35f0

Contents?: true

Size: 568 Bytes

Versions: 6

Compression:

Stored size: 568 Bytes

Contents

module NulogyMessageBusConsumer
  module Steps
    class StreamMessages
      def initialize(logger)
        @logger = logger
      end

      def call(kafka_consumer:, **_)
        kafka_consumer.each do |kafka_message|
          yield(
            message: Message.from_kafka(kafka_message),
            kafka_message: kafka_message
          )
        end
      rescue => e
        @logger.error(JSON.dump({
          event: "message_processing_errored",
          class: e.class,
          message: e.message
        }))

        raise
      end
    end
  end
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
nulogy_message_bus_consumer-2.0.1 lib/nulogy_message_bus_consumer/steps/stream_messages.rb
nulogy_message_bus_consumer-2.0.0 lib/nulogy_message_bus_consumer/steps/stream_messages.rb
nulogy_message_bus_consumer-1.0.0 lib/nulogy_message_bus_consumer/steps/stream_messages.rb
nulogy_message_bus_consumer-0.5.0 lib/nulogy_message_bus_consumer/steps/stream_messages.rb
nulogy_message_bus_consumer-1.0.0.alpha lib/nulogy_message_bus_consumer/steps/stream_messages.rb
nulogy_message_bus_consumer-0.4.0 lib/nulogy_message_bus_consumer/steps/stream_messages.rb