Sha256: 35ed1fcacf0960bdde9a11f7dee69b10db7fd983d58155ab2698ba9ea76859cc

Contents?: true

Size: 1.02 KB

Versions: 6

Compression:

Stored size: 1.02 KB

Contents

module NulogyMessageBusConsumer
  module Steps
    class DeduplicateMessages
      def initialize(logger)
        @logger = logger
      end

      def call(message:, **_)
        return :success if duplicate_exists?(message)

        result = :unknown
        ProcessedMessage.transaction(joinable: false) do
          result = yield

          record_processed_message(message) if result == :success
        end

        result
      end

      private

      def duplicate_exists?(message)
        if ProcessedMessage.exists?(id: message.id)
          log_duplicate(message)

          return true
        end

        false
      end

      def record_processed_message(message)
        ProcessedMessage.create!(id: message.id)
      rescue ActiveRecord::RecordNotUnique
        log_duplicate(message)

        raise ActiveRecord::Rollback
      end

      def log_duplicate(message)
        @logger.warn(JSON.dump({
          event: "duplicate_message_detected",
          kafka_message_id: message.id
        }))
      end
    end
  end
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
nulogy_message_bus_consumer-2.0.1 lib/nulogy_message_bus_consumer/steps/deduplicate_messages.rb
nulogy_message_bus_consumer-2.0.0 lib/nulogy_message_bus_consumer/steps/deduplicate_messages.rb
nulogy_message_bus_consumer-1.0.0 lib/nulogy_message_bus_consumer/steps/deduplicate_messages.rb
nulogy_message_bus_consumer-0.5.0 lib/nulogy_message_bus_consumer/steps/deduplicate_messages.rb
nulogy_message_bus_consumer-1.0.0.alpha lib/nulogy_message_bus_consumer/steps/deduplicate_messages.rb
nulogy_message_bus_consumer-0.4.0 lib/nulogy_message_bus_consumer/steps/deduplicate_messages.rb