Sha256: dfa96b22791c8f0073ccb87448ccfe5648d0b736c2bdc98b68c31fe62124e266

Contents?: true

Size: 876 Bytes

Versions: 4

Compression:

Stored size: 876 Bytes

Contents

# frozen_string_literal: true

module Phobos
  module Actions
    class ProcessBatch
      include Phobos::Instrumentation

      attr_reader :metadata

      def initialize(listener:, batch:, listener_metadata:)
        @listener = listener
        @batch = batch
        @listener_metadata = listener_metadata
        @metadata = listener_metadata.merge(
          batch_size: batch.messages.count,
          partition: batch.partition,
          offset_lag: batch.offset_lag
        )
      end

      def execute
        instrument('listener.process_batch', @metadata) do |_metadata|
          @batch.messages.each do |message|
            Phobos::Actions::ProcessMessage.new(
              listener: @listener,
              message: message,
              listener_metadata: @listener_metadata
            ).execute
          end
        end
      end
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
phobos-1.8.2 lib/phobos/actions/process_batch.rb
phobos-1.8.2.pre.beta2 lib/phobos/actions/process_batch.rb
phobos-1.8.2.pre.beta1 lib/phobos/actions/process_batch.rb
phobos-1.8.1 lib/phobos/actions/process_batch.rb