Sha256: 2fe2897d281605af25fc8ba8f24268587afa54834199dc3c8cd0e6a1528f6283
Contents?: true
Size: 1.08 KB
Versions: 4
Compression:
Stored size: 1.08 KB
Contents
# frozen_string_literal: true module Phobos module Actions class ProcessBatch include Phobos::Instrumentation include Phobos::Log attr_reader :metadata def initialize(listener:, batch:, listener_metadata:) @listener = listener @batch = batch @listener_metadata = listener_metadata @metadata = listener_metadata.merge( batch_size: batch.messages.count, partition: batch.partition, offset_lag: batch.offset_lag ) end def execute instrument('listener.process_batch', @metadata) do |_metadata| @batch.messages.each do |message| Phobos::Actions::ProcessMessage.new( listener: @listener, message: message, listener_metadata: @listener_metadata ).execute begin @listener.consumer.trigger_heartbeat rescue Kafka::HeartbeatError => e log_warn("Error sending Heartbeat #{e.class.name}-#{e}") end end end end end end end
Version data entries
4 entries across 4 versions & 1 rubygems