Sha256: 4c00daf5707958a1556126ad8d10a6606cb879a42e12e7be6cffed78b2da24d1

Contents?: true

Size: 645 Bytes

Versions: 5

Compression:

Stored size: 645 Bytes

Contents

module NulogyMessageBusConsumer
  module Steps
    class StreamMessagesUntilNoneAreLeft
      def initialize(logger)
        @logger = logger
      end

      def call(kafka_consumer:, **_)
        KafkaUtils.every_message_until_none_are_left(kafka_consumer).each do |kafka_message|
          yield(
            message: Message.from_kafka(kafka_message),
            kafka_message: kafka_message
          )
        end
      rescue StandardError => e
        @logger.error(JSON.dump({
          event: "message_processing_errored",
          class: e.class,
          message: e.message,
        }))

        raise
      end
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
nulogy_message_bus_consumer-0.3.3 lib/nulogy_message_bus_consumer/steps/stream_messages_until_none_are_left.rb
nulogy_message_bus_consumer-0.3.2 lib/nulogy_message_bus_consumer/steps/stream_messages_until_none_are_left.rb
nulogy_message_bus_consumer-0.3.1 lib/nulogy_message_bus_consumer/steps/stream_messages_until_none_are_left.rb
nulogy_message_bus_consumer-0.3.0 lib/nulogy_message_bus_consumer/steps/stream_messages_until_none_are_left.rb
nulogy_message_bus_consumer-0.2.0 lib/nulogy_message_bus_consumer/steps/stream_messages_until_none_are_left.rb