Sha256: 35ed1fcacf0960bdde9a11f7dee69b10db7fd983d58155ab2698ba9ea76859cc
Contents?: true
Size: 1.02 KB
Versions: 6
Compression:
Stored size: 1.02 KB
Contents
module NulogyMessageBusConsumer module Steps class DeduplicateMessages def initialize(logger) @logger = logger end def call(message:, **_) return :success if duplicate_exists?(message) result = :unknown ProcessedMessage.transaction(joinable: false) do result = yield record_processed_message(message) if result == :success end result end private def duplicate_exists?(message) if ProcessedMessage.exists?(id: message.id) log_duplicate(message) return true end false end def record_processed_message(message) ProcessedMessage.create!(id: message.id) rescue ActiveRecord::RecordNotUnique log_duplicate(message) raise ActiveRecord::Rollback end def log_duplicate(message) @logger.warn(JSON.dump({ event: "duplicate_message_detected", kafka_message_id: message.id })) end end end end
Version data entries
6 entries across 6 versions & 1 rubygems