Sha256: 463870ecd4ded590cdabe528ca61c5ba10a1d11f6147bcc901a441931e41b0e2

Contents?: true

Size: 1.12 KB

Versions: 2

Compression:

Stored size: 1.12 KB

Contents

# frozen_string_literal: true

module WaterDrop
  # Namespace for all the clients that WaterDrop may use under the hood
  module Clients
    # Default Rdkafka client.
    # Since we use the ::Rdkafka::Producer under the hood, this is just a module that aligns with
    # client building API for the convenience.
    module Rdkafka
      class << self
        # @param producer [WaterDrop::Producer] producer instance with its config, etc
        # @note We overwrite this that way, because we do not care
        def new(producer)
          config = producer.config.kafka.to_h

          client = ::Rdkafka::Config.new(config).producer

          # This callback is not global and is per client, thus we do not have to wrap it with a
          # callbacks manager to make it work
          client.delivery_callback = Instrumentation::Callbacks::Delivery.new(
            producer.id,
            producer.config.monitor
          )

          # Switch to the transactional mode if user provided the transactional id
          client.init_transactions if config.key?(:'transactional.id')

          client
        end
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
waterdrop-2.6.9 lib/waterdrop/clients/rdkafka.rb
waterdrop-2.6.8 lib/waterdrop/clients/rdkafka.rb