Sha256: 8f2103f8d715d77e68b59758d2d5c4f72854ea199a666f48d42209ce440a716a

Contents?: true

Size: 1.21 KB

Versions: 3

Compression:

Stored size: 1.21 KB

Contents

module NulogyMessageBusConsumer
  module Steps
    class Clock
      # milliseconds since epoch
      def now
        Time.zone.now.to_datetime.strftime("%Q").to_i
      end
    end

    class LogMessages
      def initialize(logger, clock: Clock.new)
        @logger = logger
        @clock = clock
      end

      def call(message:, **_)
        @logger.info(JSON.dump({
          event: "message_received",
          kafka_message_id: message.id,
          message: "Received #{message.id}",
        }))

        result = yield

        millis = diff_millis(message.created_at, @clock.now)
        @logger.info(JSON.dump({
          event: "message_processed",
          kafka_message_id: message.id,
          message: "Processed #{message.id}",
          result: result,
          time_to_processed: millis,
        }))

        result
      end

      # Debezium appears to be giving us nanos since epoch
      # https://github.com/debezium/debezium/blob/5a115e902cdc1dc399ec02758dd1039a33e99bc2/debezium-core/src/main/java/io/debezium/jdbc/JdbcValueConverters.java#L237
      def diff_millis(oldest_nanos, newest_millis)
        old_millis = oldest_nanos / 1000

        newest_millis - old_millis
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
nulogy_message_bus_consumer-0.3.3 lib/nulogy_message_bus_consumer/steps/log_messages.rb
nulogy_message_bus_consumer-0.3.2 lib/nulogy_message_bus_consumer/steps/log_messages.rb
nulogy_message_bus_consumer-0.3.1 lib/nulogy_message_bus_consumer/steps/log_messages.rb