Sha256: 9ba278a4d19cf6cc9014c0c759c49b6c7a16eb3123089bbece8e4a1944e0bb70

Contents?: true

Size: 1.58 KB

Versions: 4

Compression:

Stored size: 1.58 KB

Contents

# frozen_string_literal: true

require 'phobos/batch_message'
require 'phobos/processor'

module Phobos
  module Actions
    class ProcessBatchInline
      include Phobos::Processor

      attr_reader :metadata

      def initialize(listener:, batch:, metadata:)
        @listener = listener
        @batch = batch
        @listener = listener
        @batch = batch
        @metadata = metadata.merge(
          batch_size: batch.messages.count,
          partition: batch.partition,
          offset_lag: batch.offset_lag,
          retry_count: 0
        )
      end

      def execute
        payloads = @batch.messages.map do |message|
          Phobos::BatchMessage.new(
            key: message.key,
            partition: message.partition,
            offset: message.offset,
            payload: force_encoding(message.value)
          )
        end

        begin
          process_batch(payloads)
        rescue StandardError => e
          handle_error(e, 'listener.retry_handler_error_batch',
                       "error processing inline batch, waiting #{backoff_interval}s")
          retry
        end
      end

      private

      def process_batch(payloads)
        instrument('listener.process_batch_inline', @metadata) do |_metadata|
          handler = @listener.handler_class.new

          preprocessed_payloads = handler.before_consume_batch(payloads, @metadata)
          consume_block = proc { handler.consume_batch(preprocessed_payloads, @metadata) }

          handler.around_consume_batch(preprocessed_payloads, @metadata, &consume_block)
        end
      end
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
phobos-1.8.3.pre.beta1 lib/phobos/actions/process_batch_inline.rb
phobos-1.8.2 lib/phobos/actions/process_batch_inline.rb
phobos-1.8.2.pre.beta2 lib/phobos/actions/process_batch_inline.rb
phobos-1.8.2.pre.beta1 lib/phobos/actions/process_batch_inline.rb