lib/rdkafka/config.rb in rdkafka-0.3.5 vs lib/rdkafka/config.rb in rdkafka-0.4.0

- old
+ new

@@ -5,10 +5,11 @@ # the consumer and producer methods to create a client. Documentation of the available # configuration options is available on https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md. class Config # @private @@logger = Logger.new(STDOUT) + @@statistics_callback = nil # Returns the current logger, by default this is a logger to stdout. # # @return [Logger] def self.logger @@ -23,10 +24,29 @@ def self.logger=(logger) raise NoLoggerError if logger.nil? @@logger=logger end + # Set a callback that will be called every time the underlying client emits statistics. + # You can configure if and how often this happens using `statistics.interval.ms`. + # The callback is called with a hash that's documented here: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md + # + # @param callback [Proc] The callback + # + # @return [nil] + def self.statistics_callback=(callback) + raise TypeError.new("Callback has to be a proc or lambda") unless callback.is_a? Proc + @@statistics_callback = callback + end + + # Returns the current statistics callback, by default this is nil. + # + # @return [Proc, nil] + def self.statistics_callback + @@statistics_callback + end + # Default config that can be overwritten. DEFAULT_CONFIG = { # Request api version so advanced features work :"api.version.request" => true }.freeze @@ -120,11 +140,15 @@ ) unless result == :config_ok raise ConfigError.new(error_buffer.read_string) end end + # Set opaque pointer back to this config + #Rdkafka::Bindings.rd_kafka_conf_set_opaque(config, self) # Set log callback Rdkafka::Bindings.rd_kafka_conf_set_log_cb(config, Rdkafka::Bindings::LogCallback) + # Set stats callback + Rdkafka::Bindings.rd_kafka_conf_set_stats_cb(config, Rdkafka::Bindings::StatsCallback) end end def native_kafka(config, type) error_buffer = FFI::MemoryPointer.from_string(" " * 256)