Sha256: fcfb398d46f06598545b815dc01a14a63c4cd9dd1df1fbb0b70d8dbf76f1682e
Contents?: true
Size: 1.31 KB
Versions: 2
Compression:
Stored size: 1.31 KB
Contents
module NulogyMessageBusConsumer module Steps class CommitOnSuccess def initialize(logger) @logger = logger end def call(kafka_consumer:, message:, **_) result = yield raise_if_invalid(result) if result == :success kafka_consumer.store_offset(message) kafka_consumer.commit @logger.info(JSON.dump({ event: "message_committed", kafka_message_id: message.id, message: message.to_h })) else reconnect_to_reprocess_same_message(kafka_consumer) @logger.info(JSON.dump({ event: "message_failed", kafka_message_id: message.id, message: message.to_h })) end result end private def reconnect_to_reprocess_same_message(kafka_consumer) subscriptions = kafka_consumer.subscription kafka_consumer.unsubscribe kafka_consumer.subscribe(*subscriptions.to_h.keys) KafkaUtils.wait_for_assignment(kafka_consumer) end def raise_if_invalid(result) return if %i[success failure].include?(result) raise( StandardError, "'#{result}' is not a valid processing outcome. Must be :success or :failure" ) end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
nulogy_message_bus_consumer-3.0.0 | lib/nulogy_message_bus_consumer/steps/commit_on_success.rb |
nulogy_message_bus_consumer-2.0.1 | lib/nulogy_message_bus_consumer/steps/commit_on_success.rb |