Sha256: 05ea725d3fec1a8bcb1f00640e1a71440b7e0f1fb651a64ade7ff2db49db8563

Contents?: true

Size: 925 Bytes

Versions: 16

Compression:

Stored size: 925 Bytes

Contents

# frozen_string_literal: true

module Phobos
  module Actions
    class ProcessBatch
      include Phobos::Instrumentation

      attr_reader :metadata

      def initialize(listener:, batch:, listener_metadata:)
        @listener = listener
        @batch = batch
        @listener_metadata = listener_metadata
        @metadata = listener_metadata.merge(
          batch_size: batch.messages.count,
          partition: batch.partition,
          offset_lag: batch.offset_lag
        )
      end

      def execute
        instrument('listener.process_batch', @metadata) do |_metadata|
          @batch.messages.each do |message|
            Phobos::Actions::ProcessMessage.new(
              listener: @listener,
              message: message,
              listener_metadata: @listener_metadata
            ).execute
            @listener.consumer.trigger_heartbeat
          end
        end
      end
    end
  end
end

Version data entries

16 entries across 16 versions & 2 rubygems

Version Path
phobos-2.1.2 lib/phobos/actions/process_batch.rb
phobos-2.1.1 lib/phobos/actions/process_batch.rb
phobos_temp_fork-0.0.4 lib/phobos/actions/process_batch.rb
phobos_temp_fork-0.0.3 lib/phobos/actions/process_batch.rb
phobos_temp_fork-0.0.2 lib/phobos/actions/process_batch.rb
phobos_temp_fork-0.0.1 lib/phobos/actions/process_batch.rb
phobos-2.1.0 lib/phobos/actions/process_batch.rb
phobos-2.0.2 lib/phobos/actions/process_batch.rb
phobos-2.0.1 lib/phobos/actions/process_batch.rb
phobos-2.0.0.pre.beta1 lib/phobos/actions/process_batch.rb
phobos-1.9.0 lib/phobos/actions/process_batch.rb
phobos-1.9.0.pre.beta3 lib/phobos/actions/process_batch.rb
phobos-1.9.0.pre.beta2 lib/phobos/actions/process_batch.rb
phobos-1.9.0.pre.beta1 lib/phobos/actions/process_batch.rb
phobos-1.8.3.pre.beta2 lib/phobos/actions/process_batch.rb
phobos-1.8.3.pre.beta1 lib/phobos/actions/process_batch.rb