Sha256: 912ca8a1ab8cc60ec9ac0750c6e1620cb10d87744e78d852ed03583ec2e2498e

Contents?: true

Size: 1.15 KB

Versions: 8

Compression:

Stored size: 1.15 KB

Contents

# frozen_string_literal: true

module WaterDrop
  # Namespace for all the clients that WaterDrop may use under the hood
  module Clients
    # Default Rdkafka client.
    # Since we use the ::Rdkafka::Producer under the hood, this is just a module that aligns with
    # client building API for the convenience.
    module Rdkafka
      class << self
        # @param producer [WaterDrop::Producer] producer instance with its config, etc
        # @note We overwrite this that way, because we do not care
        def new(producer)
          config = producer.config.kafka.to_h

          client = ::Rdkafka::Config.new(config).producer

          # This callback is not global and is per client, thus we do not have to wrap it with a
          # callbacks manager to make it work
          client.delivery_callback = Instrumentation::Callbacks::Delivery.new(
            producer.id,
            producer.transactional?,
            producer.config.monitor
          )

          # Switch to the transactional mode if user provided the transactional id
          client.init_transactions if config.key?(:'transactional.id')

          client
        end
      end
    end
  end
end

Version data entries

8 entries across 8 versions & 1 rubygems

Version Path
waterdrop-2.7.0.alpha3 lib/waterdrop/clients/rdkafka.rb
waterdrop-2.7.0.alpha2 lib/waterdrop/clients/rdkafka.rb
waterdrop-2.7.0.alpha1 lib/waterdrop/clients/rdkafka.rb
waterdrop-2.6.14 lib/waterdrop/clients/rdkafka.rb
waterdrop-2.6.13 lib/waterdrop/clients/rdkafka.rb
waterdrop-2.6.12 lib/waterdrop/clients/rdkafka.rb
waterdrop-2.6.11 lib/waterdrop/clients/rdkafka.rb
waterdrop-2.6.10 lib/waterdrop/clients/rdkafka.rb