lib/rdkafka/config.rb in rdkafka-0.9.0 vs lib/rdkafka/config.rb in rdkafka-0.10.0

- old
+ new

@@ -8,10 +8,12 @@ # @private @@logger = Logger.new(STDOUT) # @private @@statistics_callback = nil # @private + @@error_callback = nil + # @private @@opaques = {} # @private @@log_queue = Queue.new Thread.start do @@ -26,10 +28,11 @@ # @return [Logger] def self.logger @@logger end + # Returns a queue whose contents will be passed to the configured logger. Each entry # should follow the format [Logger::Severity, String]. The benefit over calling the # logger directly is that this is safe to use from trap contexts. # # @return [Queue] @@ -64,10 +67,29 @@ # @return [Proc, nil] def self.statistics_callback @@statistics_callback end + # Set a callback that will be called every time the underlying client emits an error. + # If this callback is not set, global errors such as brokers becoming unavailable will only be sent to the logger, as defined by librdkafka. + # The callback is called with an instance of RdKafka::Error. + # + # @param callback [Proc, #call] The callback + # + # @return [nil] + def self.error_callback=(callback) + raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call) + @@error_callback = callback + end + + # Returns the current error callback, by default this is nil. + # + # @return [Proc, nil] + def self.error_callback + @@error_callback + end + # @private def self.opaques @@opaques end @@ -219,9 +241,12 @@ # Set log callback Rdkafka::Bindings.rd_kafka_conf_set_log_cb(config, Rdkafka::Bindings::LogCallback) # Set stats callback Rdkafka::Bindings.rd_kafka_conf_set_stats_cb(config, Rdkafka::Bindings::StatsCallback) + + # Set error callback + Rdkafka::Bindings.rd_kafka_conf_set_error_cb(config, Rdkafka::Bindings::ErrorCallback) end end def native_kafka(config, type) error_buffer = FFI::MemoryPointer.from_string(" " * 256)