Sha256: 2472c72b47acf0502909f43b25f495ed822429074d661368205c2b964078f82d

Contents?: true

Size: 844 Bytes

Versions: 4

Compression:

Stored size: 844 Bytes

Contents

module Phobos
  module Actions
    class ProcessBatch
      include Phobos::Instrumentation

      attr_reader :metadata

      def initialize(listener:, batch:, listener_metadata:)
        @listener = listener
        @batch = batch
        @listener_metadata = listener_metadata
        @metadata = listener_metadata.merge(
          batch_size: batch.messages.count,
          partition: batch.partition,
          offset_lag: batch.offset_lag
        )
      end

      def execute
        instrument('listener.process_batch', @metadata) do |metadata|
          @batch.messages.each do |message|
            Phobos::Actions::ProcessMessage.new(
              listener: @listener,
              message: message,
              listener_metadata: @listener_metadata
            ).execute
          end
        end
      end
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
phobos-1.8.0 lib/phobos/actions/process_batch.rb
phobos-1.7.2 lib/phobos/actions/process_batch.rb
phobos-1.7.1 lib/phobos/actions/process_batch.rb
phobos-1.7.0 lib/phobos/actions/process_batch.rb