lib/rdkafka/config.rb in rdkafka-0.13.1 vs lib/rdkafka/config.rb in rdkafka-0.14.0.rc1

- old
+ new

@@ -1,22 +1,20 @@ # frozen_string_literal: true -require "logger" - module Rdkafka # Configuration for a Kafka consumer or producer. You can create an instance and use # the consumer and producer methods to create a client. Documentation of the available - # configuration options is available on https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md. + # configuration options is available on https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md. class Config # @private @@logger = Logger.new(STDOUT) # @private @@statistics_callback = nil # @private @@error_callback = nil # @private - @@opaques = {} + @@opaques = ObjectSpace::WeakMap.new # @private @@log_queue = Queue.new Thread.start do loop do @@ -51,17 +49,17 @@ @@logger = logger end # Set a callback that will be called every time the underlying client emits statistics. # You can configure if and how often this happens using `statistics.interval.ms`. - # The callback is called with a hash that's documented here: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md + # The callback is called with a hash that's documented here: https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md # # @param callback [Proc, #call] The callback # # @return [nil] def self.statistics_callback=(callback) - raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call) + raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call) || callback == nil @@statistics_callback = callback end # Returns the current statistics callback, by default this is nil. # @@ -140,16 +138,16 @@ # @param listener [Object, #on_partitions_assigned, #on_partitions_revoked] listener instance def consumer_rebalance_listener=(listener) @consumer_rebalance_listener = listener end - # Create a consumer with this configuration. + # Creates a consumer with this configuration. # + # @return [Consumer] The created consumer + # # @raise [ConfigError] When the configuration contains invalid options # @raise [ClientCreationError] When the native client cannot be created - # - # @return [Consumer] The created consumer def consumer opaque = Opaque.new config = native_config(opaque) if @consumer_rebalance_listener @@ -162,43 +160,58 @@ # Redirect the main queue to the consumer Rdkafka::Bindings.rd_kafka_poll_set_consumer(kafka) # Return consumer with Kafka client - Rdkafka::Consumer.new(Rdkafka::NativeKafka.new(kafka, run_polling_thread: false)) + Rdkafka::Consumer.new( + Rdkafka::NativeKafka.new( + kafka, + run_polling_thread: false, + opaque: opaque + ) + ) end # Create a producer with this configuration. # + # @return [Producer] The created producer + # # @raise [ConfigError] When the configuration contains invalid options # @raise [ClientCreationError] When the native client cannot be created - # - # @return [Producer] The created producer def producer # Create opaque opaque = Opaque.new # Create Kafka config config = native_config(opaque) # Set callback to receive delivery reports on config Rdkafka::Bindings.rd_kafka_conf_set_dr_msg_cb(config, Rdkafka::Callbacks::DeliveryCallbackFunction) # Return producer with Kafka client partitioner_name = self[:partitioner] || self["partitioner"] - Rdkafka::Producer.new(Rdkafka::NativeKafka.new(native_kafka(config, :rd_kafka_producer), run_polling_thread: true), partitioner_name).tap do |producer| + Rdkafka::Producer.new( + Rdkafka::NativeKafka.new(native_kafka(config, :rd_kafka_producer), run_polling_thread: true, opaque: opaque), + partitioner_name + ).tap do |producer| opaque.producer = producer end end - # Create an admin instance with this configuration. + # Creates an admin instance with this configuration. # + # @return [Admin] The created admin instance + # # @raise [ConfigError] When the configuration contains invalid options # @raise [ClientCreationError] When the native client cannot be created - # - # @return [Admin] The created admin instance def admin opaque = Opaque.new config = native_config(opaque) Rdkafka::Bindings.rd_kafka_conf_set_background_event_cb(config, Rdkafka::Callbacks::BackgroundEventCallbackFunction) - Rdkafka::Admin.new(Rdkafka::NativeKafka.new(native_kafka(config, :rd_kafka_producer), run_polling_thread: true)) + Rdkafka::Admin.new( + Rdkafka::NativeKafka.new( + native_kafka(config, :rd_kafka_producer), + run_polling_thread: true, + opaque: opaque + ) + ) end # Error that is returned by the underlying rdkafka error if an invalid configuration option is present. class ConfigError < RuntimeError; end