module NulogyMessageBusConsumer module Steps class DeduplicateMessages def initialize(logger) @logger = logger end def call(message:, **_) return :success if duplicate_exists?(message) result = :unknown ProcessedMessage.transaction(joinable: false) do result = yield record_processed_message(message) if result == :success end result end private def duplicate_exists?(message) if ProcessedMessage.exists?(id: message.id) log_duplicate(message) return true end false end def record_processed_message(message) ProcessedMessage.create!(id: message.id) rescue ActiveRecord::RecordNotUnique log_duplicate(message) raise ActiveRecord::Rollback end def log_duplicate(message) @logger.warn(JSON.dump({ event: "duplicate_message_detected", kafka_message_id: message.id, })) end end end end