lib/rdkafka/config.rb in rdkafka-0.15.2 vs lib/rdkafka/config.rb in rdkafka-0.16.0.beta1

- old
+ new

@@ -13,25 +13,43 @@ @@error_callback = nil # @private @@opaques = ObjectSpace::WeakMap.new # @private @@log_queue = Queue.new + # We memoize thread on the first log flush + # This allows us also to restart logger thread on forks + @@log_thread = nil + # @private + @@log_mutex = Mutex.new + # @private + @@oauthbearer_token_refresh_callback = nil - Thread.start do - loop do - severity, msg = @@log_queue.pop - @@logger.add(severity, msg) - end - end - # Returns the current logger, by default this is a logger to stdout. # # @return [Logger] def self.logger @@logger end + # Makes sure that there is a thread for consuming logs + # We do not spawn thread immediately and we need to check if it operates to support forking + def self.ensure_log_thread + return if @@log_thread && @@log_thread.alive? + + @@log_mutex.synchronize do + # Restart if dead (fork, crash) + @@log_thread = nil if @@log_thread && !@@log_thread.alive? + + @@log_thread ||= Thread.start do + loop do + severity, msg = @@log_queue.pop + @@logger.add(severity, msg) + end + end + end + end + # Returns a queue whose contents will be passed to the configured logger. Each entry # should follow the format [Logger::Severity, String]. The benefit over calling the # logger directly is that this is safe to use from trap contexts. # # @return [Queue] @@ -85,10 +103,28 @@ # @return [Proc, nil] def self.error_callback @@error_callback end + # Sets the SASL/OAUTHBEARER token refresh callback. + # This callback will be triggered when it is time to refresh the client's OAUTHBEARER token + # + # @param callback [Proc, #call] The callback + # + # @return [nil] + def self.oauthbearer_token_refresh_callback=(callback) + raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call) || callback == nil + @@oauthbearer_token_refresh_callback = callback + end + + # Returns the current oauthbearer_token_refresh_callback callback, by default this is nil. + # + # @return [Proc, nil] + def self.oauthbearer_token_refresh_callback + @@oauthbearer_token_refresh_callback + end + # @private def self.opaques @@opaques end @@ -157,15 +193,17 @@ @consumer_poll_set = poll_set end # Creates a consumer with this configuration. # + # @param native_kafka_auto_start [Boolean] should the native kafka operations be started + # automatically. Defaults to true. Set to false only when doing complex initialization. # @return [Consumer] The created consumer # # @raise [ConfigError] When the configuration contains invalid options # @raise [ClientCreationError] When the native client cannot be created - def consumer + def consumer(native_kafka_auto_start: true) opaque = Opaque.new config = native_config(opaque) if @consumer_rebalance_listener opaque.consumer_rebalance_listener = @consumer_rebalance_listener @@ -181,57 +219,70 @@ # Return consumer with Kafka client Rdkafka::Consumer.new( Rdkafka::NativeKafka.new( kafka, run_polling_thread: false, - opaque: opaque + opaque: opaque, + auto_start: native_kafka_auto_start ) ) end # Create a producer with this configuration. # + # @param native_kafka_auto_start [Boolean] should the native kafka operations be started + # automatically. Defaults to true. Set to false only when doing complex initialization. # @return [Producer] The created producer # # @raise [ConfigError] When the configuration contains invalid options # @raise [ClientCreationError] When the native client cannot be created - def producer + def producer(native_kafka_auto_start: true) # Create opaque opaque = Opaque.new # Create Kafka config config = native_config(opaque) # Set callback to receive delivery reports on config Rdkafka::Bindings.rd_kafka_conf_set_dr_msg_cb(config, Rdkafka::Callbacks::DeliveryCallbackFunction) # Return producer with Kafka client partitioner_name = self[:partitioner] || self["partitioner"] + + kafka = native_kafka(config, :rd_kafka_producer) + Rdkafka::Producer.new( Rdkafka::NativeKafka.new( - native_kafka(config, :rd_kafka_producer), + kafka, run_polling_thread: true, - opaque: opaque + opaque: opaque, + auto_start: native_kafka_auto_start ), partitioner_name ).tap do |producer| opaque.producer = producer end end # Creates an admin instance with this configuration. # + # @param native_kafka_auto_start [Boolean] should the native kafka operations be started + # automatically. Defaults to true. Set to false only when doing complex initialization. # @return [Admin] The created admin instance # # @raise [ConfigError] When the configuration contains invalid options # @raise [ClientCreationError] When the native client cannot be created - def admin + def admin(native_kafka_auto_start: true) opaque = Opaque.new config = native_config(opaque) Rdkafka::Bindings.rd_kafka_conf_set_background_event_cb(config, Rdkafka::Callbacks::BackgroundEventCallbackFunction) + + kafka = native_kafka(config, :rd_kafka_producer) + Rdkafka::Admin.new( Rdkafka::NativeKafka.new( - native_kafka(config, :rd_kafka_producer), + kafka, run_polling_thread: true, - opaque: opaque + opaque: opaque, + auto_start: native_kafka_auto_start ) ) end # Error that is returned by the underlying rdkafka error if an invalid configuration option is present. @@ -281,9 +332,12 @@ # Set stats callback Rdkafka::Bindings.rd_kafka_conf_set_stats_cb(config, Rdkafka::Bindings::StatsCallback) # Set error callback Rdkafka::Bindings.rd_kafka_conf_set_error_cb(config, Rdkafka::Bindings::ErrorCallback) + + # Set oauth callback + Rdkafka::Bindings.rd_kafka_conf_set_oauthbearer_token_refresh_cb(config, Rdkafka::Bindings::OAuthbearerTokenRefreshCallback) end end def native_kafka(config, type) error_buffer = FFI::MemoryPointer.from_string(" " * 256)