Sha256: 2fe2897d281605af25fc8ba8f24268587afa54834199dc3c8cd0e6a1528f6283

Contents?: true

Size: 1.08 KB

Versions: 4

Compression:

Stored size: 1.08 KB

Contents

# frozen_string_literal: true

module Phobos
  module Actions
    class ProcessBatch
      include Phobos::Instrumentation
      include Phobos::Log

      attr_reader :metadata

      def initialize(listener:, batch:, listener_metadata:)
        @listener = listener
        @batch = batch
        @listener_metadata = listener_metadata
        @metadata = listener_metadata.merge(
          batch_size: batch.messages.count,
          partition: batch.partition,
          offset_lag: batch.offset_lag
        )
      end

      def execute
        instrument('listener.process_batch', @metadata) do |_metadata|
          @batch.messages.each do |message|
            Phobos::Actions::ProcessMessage.new(
              listener: @listener,
              message: message,
              listener_metadata: @listener_metadata
            ).execute
            begin
              @listener.consumer.trigger_heartbeat
            rescue Kafka::HeartbeatError => e
              log_warn("Error sending Heartbeat #{e.class.name}-#{e}")
            end
          end
        end
      end
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
phobos-2.1.6 lib/phobos/actions/process_batch.rb
phobos-2.1.5 lib/phobos/actions/process_batch.rb
phobos-2.1.4 lib/phobos/actions/process_batch.rb
phobos-2.1.3 lib/phobos/actions/process_batch.rb