lib/rdkafka/config.rb in rdkafka-0.1.7 vs lib/rdkafka/config.rb in rdkafka-0.1.8

- old
+ new

@@ -1,7 +1,19 @@ +require "logger" + module Rdkafka class Config + @@logger = Logger.new(STDOUT) + + def self.logger + @@logger + end + + def self.logger=(logger) + @@logger=logger + end + DEFAULT_CONFIG = { :"api.version.request" => true } def initialize(config_hash = {}) @@ -37,26 +49,25 @@ private # This method is only intented to be used to create a client, # using it in another way will leak memory. def native_config - config = Rdkafka::FFI.rd_kafka_conf_new - - @config_hash.each do |key, value| - error_buffer = ::FFI::MemoryPointer.from_string(" " * 256) - result = Rdkafka::FFI.rd_kafka_conf_set( - config, - key.to_s, - value.to_s, - error_buffer, - 256 - ) - unless result == :config_ok - raise ConfigError.new(error_buffer.read_string) + Rdkafka::FFI.rd_kafka_conf_new.tap do |config| + @config_hash.each do |key, value| + error_buffer = ::FFI::MemoryPointer.from_string(" " * 256) + result = Rdkafka::FFI.rd_kafka_conf_set( + config, + key.to_s, + value.to_s, + error_buffer, + 256 + ) + unless result == :config_ok + raise ConfigError.new(error_buffer.read_string) + end end + Rdkafka::FFI.rd_kafka_conf_set_log_cb(config, Rdkafka::FFI::LogCallback) end - - config end def native_kafka(config, type) error_buffer = ::FFI::MemoryPointer.from_string(" " * 256) handle = Rdkafka::FFI.rd_kafka_new(