Sha256: 8c4a40f16d3e4cebd5292953a2008e69e623962362b658e646ff2213acbff2c9

Contents?: true

Size: 1.2 KB

Versions: 11

Compression:

Stored size: 1.2 KB

Contents

# frozen_string_literal: true

module Kafka
  # Holds a list of interceptors that implement `call`
  # and wraps calls to a chain of custom interceptors.
  class Interceptors
    def initialize(interceptors:, logger:)
      @interceptors = interceptors || []
      @logger = TaggedLogger.new(logger)
    end

    # This method is called when the client produces a message or once the batches are fetched.
    # The message returned from the first call is passed to the second interceptor call, and so on in an
    # interceptor chain. This method does not throw exceptions.
    #
    # @param intercepted [Kafka::PendingMessage || Kafka::FetchedBatch] the produced message or
    #   fetched batch.
    #
    # @return [Kafka::PendingMessage || Kafka::FetchedBatch] the intercepted message or batch
    #   returned by the last interceptor.
    def call(intercepted)
      @interceptors.each do |interceptor|
        begin
          intercepted = interceptor.call(intercepted)
        rescue Exception => e
          @logger.warn "Error executing interceptor for topic: #{intercepted.topic} partition: #{intercepted.partition}: #{e.message}\n#{e.backtrace.join("\n")}"
        end
      end

      intercepted
    end
  end
end

Version data entries

11 entries across 11 versions & 3 rubygems

Version Path
ruby-kafka-1.5.0 lib/kafka/interceptors.rb
ruby-kafka-aws-iam-1.4.5 lib/kafka/interceptors.rb
ruby-kafka-aws-iam-1.4.4 lib/kafka/interceptors.rb
ruby-kafka-aws-iam-1.4.3 lib/kafka/interceptors.rb
ruby-kafka-aws-iam-1.4.2 lib/kafka/interceptors.rb
ruby-kafka-aws-iam-1.4.1 lib/kafka/interceptors.rb
ruby-kafka-1.4.0 lib/kafka/interceptors.rb
ruby-kafka-temp-fork-0.0.2 lib/kafka/interceptors.rb
ruby-kafka-temp-fork-0.0.1 lib/kafka/interceptors.rb
ruby-kafka-1.3.0 lib/kafka/interceptors.rb
ruby-kafka-1.2.0 lib/kafka/interceptors.rb