lib/rdkafka/config.rb in rdkafka-0.3.5 vs lib/rdkafka/config.rb in rdkafka-0.4.0
- old
+ new
@@ -5,10 +5,11 @@
# the consumer and producer methods to create a client. Documentation of the available
# configuration options is available on https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
class Config
# @private
@@logger = Logger.new(STDOUT)
+ @@statistics_callback = nil
# Returns the current logger, by default this is a logger to stdout.
#
# @return [Logger]
def self.logger
@@ -23,10 +24,29 @@
def self.logger=(logger)
raise NoLoggerError if logger.nil?
@@logger=logger
end
+ # Set a callback that will be called every time the underlying client emits statistics.
+ # You can configure if and how often this happens using `statistics.interval.ms`.
+ # The callback is called with a hash that's documented here: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md
+ #
+ # @param callback [Proc] The callback
+ #
+ # @return [nil]
+ def self.statistics_callback=(callback)
+ raise TypeError.new("Callback has to be a proc or lambda") unless callback.is_a? Proc
+ @@statistics_callback = callback
+ end
+
+ # Returns the current statistics callback, by default this is nil.
+ #
+ # @return [Proc, nil]
+ def self.statistics_callback
+ @@statistics_callback
+ end
+
# Default config that can be overwritten.
DEFAULT_CONFIG = {
# Request api version so advanced features work
:"api.version.request" => true
}.freeze
@@ -120,11 +140,15 @@
)
unless result == :config_ok
raise ConfigError.new(error_buffer.read_string)
end
end
+ # Set opaque pointer back to this config
+ #Rdkafka::Bindings.rd_kafka_conf_set_opaque(config, self)
# Set log callback
Rdkafka::Bindings.rd_kafka_conf_set_log_cb(config, Rdkafka::Bindings::LogCallback)
+ # Set stats callback
+ Rdkafka::Bindings.rd_kafka_conf_set_stats_cb(config, Rdkafka::Bindings::StatsCallback)
end
end
def native_kafka(config, type)
error_buffer = FFI::MemoryPointer.from_string(" " * 256)