Sha256: fcfb398d46f06598545b815dc01a14a63c4cd9dd1df1fbb0b70d8dbf76f1682e

Contents?: true

Size: 1.31 KB

Versions: 2

Compression:

Stored size: 1.31 KB

Contents

module NulogyMessageBusConsumer
  module Steps
    class CommitOnSuccess
      def initialize(logger)
        @logger = logger
      end

      def call(kafka_consumer:, message:, **_)
        result = yield

        raise_if_invalid(result)

        if result == :success
          kafka_consumer.store_offset(message)
          kafka_consumer.commit
          @logger.info(JSON.dump({
            event: "message_committed",
            kafka_message_id: message.id,
            message: message.to_h
          }))
        else
          reconnect_to_reprocess_same_message(kafka_consumer)
          @logger.info(JSON.dump({
            event: "message_failed",
            kafka_message_id: message.id,
            message: message.to_h
          }))
        end

        result
      end

      private

      def reconnect_to_reprocess_same_message(kafka_consumer)
        subscriptions = kafka_consumer.subscription
        kafka_consumer.unsubscribe
        kafka_consumer.subscribe(*subscriptions.to_h.keys)
        KafkaUtils.wait_for_assignment(kafka_consumer)
      end

      def raise_if_invalid(result)
        return if %i[success failure].include?(result)

        raise(
          StandardError,
          "'#{result}' is not a valid processing outcome. Must be :success or :failure"
        )
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
nulogy_message_bus_consumer-3.0.0 lib/nulogy_message_bus_consumer/steps/commit_on_success.rb
nulogy_message_bus_consumer-2.0.1 lib/nulogy_message_bus_consumer/steps/commit_on_success.rb