module NulogyMessageBusConsumer module Steps class Clock # milliseconds since epoch def now Time.zone.now.to_datetime.strftime("%Q").to_i end end class LogMessages def initialize(logger, clock: Clock.new) @logger = logger @clock = clock end def call(message:, **_) @logger.info(JSON.dump({ event: "message_received", kafka_message_id: message.id, message: "Received #{message.id}", })) result = yield millis = diff_millis(message.created_at, @clock.now) @logger.info(JSON.dump({ event: "message_processed", kafka_message_id: message.id, message: "Processed #{message.id}", result: result, time_to_processed: millis, })) result end # Debezium appears to be giving us nanos since epoch # https://github.com/debezium/debezium/blob/5a115e902cdc1dc399ec02758dd1039a33e99bc2/debezium-core/src/main/java/io/debezium/jdbc/JdbcValueConverters.java#L237 def diff_millis(oldest_nanos, newest_millis) old_millis = oldest_nanos / 1000 newest_millis - old_millis end end end end