lib/rdkafka/config.rb in rdkafka-0.15.2 vs lib/rdkafka/config.rb in rdkafka-0.16.0.beta1
- old
+ new
@@ -13,25 +13,43 @@
@@error_callback = nil
# @private
@@opaques = ObjectSpace::WeakMap.new
# @private
@@log_queue = Queue.new
+ # We memoize thread on the first log flush
+ # This allows us also to restart logger thread on forks
+ @@log_thread = nil
+ # @private
+ @@log_mutex = Mutex.new
+ # @private
+ @@oauthbearer_token_refresh_callback = nil
- Thread.start do
- loop do
- severity, msg = @@log_queue.pop
- @@logger.add(severity, msg)
- end
- end
-
# Returns the current logger, by default this is a logger to stdout.
#
# @return [Logger]
def self.logger
@@logger
end
+ # Makes sure that there is a thread for consuming logs
+ # We do not spawn thread immediately and we need to check if it operates to support forking
+ def self.ensure_log_thread
+ return if @@log_thread && @@log_thread.alive?
+
+ @@log_mutex.synchronize do
+ # Restart if dead (fork, crash)
+ @@log_thread = nil if @@log_thread && !@@log_thread.alive?
+
+ @@log_thread ||= Thread.start do
+ loop do
+ severity, msg = @@log_queue.pop
+ @@logger.add(severity, msg)
+ end
+ end
+ end
+ end
+
# Returns a queue whose contents will be passed to the configured logger. Each entry
# should follow the format [Logger::Severity, String]. The benefit over calling the
# logger directly is that this is safe to use from trap contexts.
#
# @return [Queue]
@@ -85,10 +103,28 @@
# @return [Proc, nil]
def self.error_callback
@@error_callback
end
+ # Sets the SASL/OAUTHBEARER token refresh callback.
+ # This callback will be triggered when it is time to refresh the client's OAUTHBEARER token
+ #
+ # @param callback [Proc, #call] The callback
+ #
+ # @return [nil]
+ def self.oauthbearer_token_refresh_callback=(callback)
+ raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call) || callback == nil
+ @@oauthbearer_token_refresh_callback = callback
+ end
+
+ # Returns the current oauthbearer_token_refresh_callback callback, by default this is nil.
+ #
+ # @return [Proc, nil]
+ def self.oauthbearer_token_refresh_callback
+ @@oauthbearer_token_refresh_callback
+ end
+
# @private
def self.opaques
@@opaques
end
@@ -157,15 +193,17 @@
@consumer_poll_set = poll_set
end
# Creates a consumer with this configuration.
#
+ # @param native_kafka_auto_start [Boolean] should the native kafka operations be started
+ # automatically. Defaults to true. Set to false only when doing complex initialization.
# @return [Consumer] The created consumer
#
# @raise [ConfigError] When the configuration contains invalid options
# @raise [ClientCreationError] When the native client cannot be created
- def consumer
+ def consumer(native_kafka_auto_start: true)
opaque = Opaque.new
config = native_config(opaque)
if @consumer_rebalance_listener
opaque.consumer_rebalance_listener = @consumer_rebalance_listener
@@ -181,57 +219,70 @@
# Return consumer with Kafka client
Rdkafka::Consumer.new(
Rdkafka::NativeKafka.new(
kafka,
run_polling_thread: false,
- opaque: opaque
+ opaque: opaque,
+ auto_start: native_kafka_auto_start
)
)
end
# Create a producer with this configuration.
#
+ # @param native_kafka_auto_start [Boolean] should the native kafka operations be started
+ # automatically. Defaults to true. Set to false only when doing complex initialization.
# @return [Producer] The created producer
#
# @raise [ConfigError] When the configuration contains invalid options
# @raise [ClientCreationError] When the native client cannot be created
- def producer
+ def producer(native_kafka_auto_start: true)
# Create opaque
opaque = Opaque.new
# Create Kafka config
config = native_config(opaque)
# Set callback to receive delivery reports on config
Rdkafka::Bindings.rd_kafka_conf_set_dr_msg_cb(config, Rdkafka::Callbacks::DeliveryCallbackFunction)
# Return producer with Kafka client
partitioner_name = self[:partitioner] || self["partitioner"]
+
+ kafka = native_kafka(config, :rd_kafka_producer)
+
Rdkafka::Producer.new(
Rdkafka::NativeKafka.new(
- native_kafka(config, :rd_kafka_producer),
+ kafka,
run_polling_thread: true,
- opaque: opaque
+ opaque: opaque,
+ auto_start: native_kafka_auto_start
),
partitioner_name
).tap do |producer|
opaque.producer = producer
end
end
# Creates an admin instance with this configuration.
#
+ # @param native_kafka_auto_start [Boolean] should the native kafka operations be started
+ # automatically. Defaults to true. Set to false only when doing complex initialization.
# @return [Admin] The created admin instance
#
# @raise [ConfigError] When the configuration contains invalid options
# @raise [ClientCreationError] When the native client cannot be created
- def admin
+ def admin(native_kafka_auto_start: true)
opaque = Opaque.new
config = native_config(opaque)
Rdkafka::Bindings.rd_kafka_conf_set_background_event_cb(config, Rdkafka::Callbacks::BackgroundEventCallbackFunction)
+
+ kafka = native_kafka(config, :rd_kafka_producer)
+
Rdkafka::Admin.new(
Rdkafka::NativeKafka.new(
- native_kafka(config, :rd_kafka_producer),
+ kafka,
run_polling_thread: true,
- opaque: opaque
+ opaque: opaque,
+ auto_start: native_kafka_auto_start
)
)
end
# Error that is returned by the underlying rdkafka error if an invalid configuration option is present.
@@ -281,9 +332,12 @@
# Set stats callback
Rdkafka::Bindings.rd_kafka_conf_set_stats_cb(config, Rdkafka::Bindings::StatsCallback)
# Set error callback
Rdkafka::Bindings.rd_kafka_conf_set_error_cb(config, Rdkafka::Bindings::ErrorCallback)
+
+ # Set oauth callback
+ Rdkafka::Bindings.rd_kafka_conf_set_oauthbearer_token_refresh_cb(config, Rdkafka::Bindings::OAuthbearerTokenRefreshCallback)
end
end
def native_kafka(config, type)
error_buffer = FFI::MemoryPointer.from_string(" " * 256)