Sha256: a869426cf62f3a989af396ec5d1857d35104c0b096a42b1ecf2a46e0051c634d

Contents?: true

Size: 854 Bytes

Versions: 2

Compression:

Stored size: 854 Bytes

Contents

module NulogyMessageBusConsumer
  module Steps
    class StreamMessagesUntilNoneAreLeft
      def initialize(logger, timeout = 250)
        @logger = logger
        @timeout = timeout
      end

      def call(kafka_consumer:, **_)
        KafkaUtils.every_message_until_none_are_left(kafka_consumer, @timeout).each do |kafka_message|
          result = yield(
            message: Message.from_kafka(kafka_message),
            kafka_message: kafka_message
          )
          if result == :failure
            # stop reading on failure or else we'll get stuck in a loop
            return :failure
          end
        end
        :success
      rescue => e
        @logger.error(JSON.dump({
          event: "message_processing_errored",
          class: e.class,
          message: e.message
        }))

        raise
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
nulogy_message_bus_consumer-3.0.0 lib/nulogy_message_bus_consumer/steps/stream_messages_until_none_are_left.rb
nulogy_message_bus_consumer-2.0.1 lib/nulogy_message_bus_consumer/steps/stream_messages_until_none_are_left.rb