lib/rdkafka/config.rb in rdkafka-0.9.0 vs lib/rdkafka/config.rb in rdkafka-0.10.0
- old
+ new
@@ -8,10 +8,12 @@
# @private
@@logger = Logger.new(STDOUT)
# @private
@@statistics_callback = nil
# @private
+ @@error_callback = nil
+ # @private
@@opaques = {}
# @private
@@log_queue = Queue.new
Thread.start do
@@ -26,10 +28,11 @@
# @return [Logger]
def self.logger
@@logger
end
+
# Returns a queue whose contents will be passed to the configured logger. Each entry
# should follow the format [Logger::Severity, String]. The benefit over calling the
# logger directly is that this is safe to use from trap contexts.
#
# @return [Queue]
@@ -64,10 +67,29 @@
# @return [Proc, nil]
def self.statistics_callback
@@statistics_callback
end
+ # Set a callback that will be called every time the underlying client emits an error.
+ # If this callback is not set, global errors such as brokers becoming unavailable will only be sent to the logger, as defined by librdkafka.
+ # The callback is called with an instance of RdKafka::Error.
+ #
+ # @param callback [Proc, #call] The callback
+ #
+ # @return [nil]
+ def self.error_callback=(callback)
+ raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call)
+ @@error_callback = callback
+ end
+
+ # Returns the current error callback, by default this is nil.
+ #
+ # @return [Proc, nil]
+ def self.error_callback
+ @@error_callback
+ end
+
# @private
def self.opaques
@@opaques
end
@@ -219,9 +241,12 @@
# Set log callback
Rdkafka::Bindings.rd_kafka_conf_set_log_cb(config, Rdkafka::Bindings::LogCallback)
# Set stats callback
Rdkafka::Bindings.rd_kafka_conf_set_stats_cb(config, Rdkafka::Bindings::StatsCallback)
+
+ # Set error callback
+ Rdkafka::Bindings.rd_kafka_conf_set_error_cb(config, Rdkafka::Bindings::ErrorCallback)
end
end
def native_kafka(config, type)
error_buffer = FFI::MemoryPointer.from_string(" " * 256)