lib/rdkafka/config.rb in rdkafka-0.13.1 vs lib/rdkafka/config.rb in rdkafka-0.14.0.rc1
- old
+ new
@@ -1,22 +1,20 @@
# frozen_string_literal: true
-require "logger"
-
module Rdkafka
# Configuration for a Kafka consumer or producer. You can create an instance and use
# the consumer and producer methods to create a client. Documentation of the available
- # configuration options is available on https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
+ # configuration options is available on https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md.
class Config
# @private
@@logger = Logger.new(STDOUT)
# @private
@@statistics_callback = nil
# @private
@@error_callback = nil
# @private
- @@opaques = {}
+ @@opaques = ObjectSpace::WeakMap.new
# @private
@@log_queue = Queue.new
Thread.start do
loop do
@@ -51,17 +49,17 @@
@@logger = logger
end
# Set a callback that will be called every time the underlying client emits statistics.
# You can configure if and how often this happens using `statistics.interval.ms`.
- # The callback is called with a hash that's documented here: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md
+ # The callback is called with a hash that's documented here: https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md
#
# @param callback [Proc, #call] The callback
#
# @return [nil]
def self.statistics_callback=(callback)
- raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call)
+ raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call) || callback == nil
@@statistics_callback = callback
end
# Returns the current statistics callback, by default this is nil.
#
@@ -140,16 +138,16 @@
# @param listener [Object, #on_partitions_assigned, #on_partitions_revoked] listener instance
def consumer_rebalance_listener=(listener)
@consumer_rebalance_listener = listener
end
- # Create a consumer with this configuration.
+ # Creates a consumer with this configuration.
#
+ # @return [Consumer] The created consumer
+ #
# @raise [ConfigError] When the configuration contains invalid options
# @raise [ClientCreationError] When the native client cannot be created
- #
- # @return [Consumer] The created consumer
def consumer
opaque = Opaque.new
config = native_config(opaque)
if @consumer_rebalance_listener
@@ -162,43 +160,58 @@
# Redirect the main queue to the consumer
Rdkafka::Bindings.rd_kafka_poll_set_consumer(kafka)
# Return consumer with Kafka client
- Rdkafka::Consumer.new(Rdkafka::NativeKafka.new(kafka, run_polling_thread: false))
+ Rdkafka::Consumer.new(
+ Rdkafka::NativeKafka.new(
+ kafka,
+ run_polling_thread: false,
+ opaque: opaque
+ )
+ )
end
# Create a producer with this configuration.
#
+ # @return [Producer] The created producer
+ #
# @raise [ConfigError] When the configuration contains invalid options
# @raise [ClientCreationError] When the native client cannot be created
- #
- # @return [Producer] The created producer
def producer
# Create opaque
opaque = Opaque.new
# Create Kafka config
config = native_config(opaque)
# Set callback to receive delivery reports on config
Rdkafka::Bindings.rd_kafka_conf_set_dr_msg_cb(config, Rdkafka::Callbacks::DeliveryCallbackFunction)
# Return producer with Kafka client
partitioner_name = self[:partitioner] || self["partitioner"]
- Rdkafka::Producer.new(Rdkafka::NativeKafka.new(native_kafka(config, :rd_kafka_producer), run_polling_thread: true), partitioner_name).tap do |producer|
+ Rdkafka::Producer.new(
+ Rdkafka::NativeKafka.new(native_kafka(config, :rd_kafka_producer), run_polling_thread: true, opaque: opaque),
+ partitioner_name
+ ).tap do |producer|
opaque.producer = producer
end
end
- # Create an admin instance with this configuration.
+ # Creates an admin instance with this configuration.
#
+ # @return [Admin] The created admin instance
+ #
# @raise [ConfigError] When the configuration contains invalid options
# @raise [ClientCreationError] When the native client cannot be created
- #
- # @return [Admin] The created admin instance
def admin
opaque = Opaque.new
config = native_config(opaque)
Rdkafka::Bindings.rd_kafka_conf_set_background_event_cb(config, Rdkafka::Callbacks::BackgroundEventCallbackFunction)
- Rdkafka::Admin.new(Rdkafka::NativeKafka.new(native_kafka(config, :rd_kafka_producer), run_polling_thread: true))
+ Rdkafka::Admin.new(
+ Rdkafka::NativeKafka.new(
+ native_kafka(config, :rd_kafka_producer),
+ run_polling_thread: true,
+ opaque: opaque
+ )
+ )
end
# Error that is returned by the underlying rdkafka error if an invalid configuration option is present.
class ConfigError < RuntimeError; end