Sha256: 463870ecd4ded590cdabe528ca61c5ba10a1d11f6147bcc901a441931e41b0e2
Contents?: true
Size: 1.12 KB
Versions: 2
Compression:
Stored size: 1.12 KB
Contents
# frozen_string_literal: true module WaterDrop # Namespace for all the clients that WaterDrop may use under the hood module Clients # Default Rdkafka client. # Since we use the ::Rdkafka::Producer under the hood, this is just a module that aligns with # client building API for the convenience. module Rdkafka class << self # @param producer [WaterDrop::Producer] producer instance with its config, etc # @note We overwrite this that way, because we do not care def new(producer) config = producer.config.kafka.to_h client = ::Rdkafka::Config.new(config).producer # This callback is not global and is per client, thus we do not have to wrap it with a # callbacks manager to make it work client.delivery_callback = Instrumentation::Callbacks::Delivery.new( producer.id, producer.config.monitor ) # Switch to the transactional mode if user provided the transactional id client.init_transactions if config.key?(:'transactional.id') client end end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
waterdrop-2.6.9 | lib/waterdrop/clients/rdkafka.rb |
waterdrop-2.6.8 | lib/waterdrop/clients/rdkafka.rb |