lib/rdkafka/config.rb in rdkafka-0.1.7 vs lib/rdkafka/config.rb in rdkafka-0.1.8
- old
+ new
@@ -1,7 +1,19 @@
+require "logger"
+
module Rdkafka
class Config
+ @@logger = Logger.new(STDOUT)
+
+ def self.logger
+ @@logger
+ end
+
+ def self.logger=(logger)
+ @@logger=logger
+ end
+
DEFAULT_CONFIG = {
:"api.version.request" => true
}
def initialize(config_hash = {})
@@ -37,26 +49,25 @@
private
# This method is only intented to be used to create a client,
# using it in another way will leak memory.
def native_config
- config = Rdkafka::FFI.rd_kafka_conf_new
-
- @config_hash.each do |key, value|
- error_buffer = ::FFI::MemoryPointer.from_string(" " * 256)
- result = Rdkafka::FFI.rd_kafka_conf_set(
- config,
- key.to_s,
- value.to_s,
- error_buffer,
- 256
- )
- unless result == :config_ok
- raise ConfigError.new(error_buffer.read_string)
+ Rdkafka::FFI.rd_kafka_conf_new.tap do |config|
+ @config_hash.each do |key, value|
+ error_buffer = ::FFI::MemoryPointer.from_string(" " * 256)
+ result = Rdkafka::FFI.rd_kafka_conf_set(
+ config,
+ key.to_s,
+ value.to_s,
+ error_buffer,
+ 256
+ )
+ unless result == :config_ok
+ raise ConfigError.new(error_buffer.read_string)
+ end
end
+ Rdkafka::FFI.rd_kafka_conf_set_log_cb(config, Rdkafka::FFI::LogCallback)
end
-
- config
end
def native_kafka(config, type)
error_buffer = ::FFI::MemoryPointer.from_string(" " * 256)
handle = Rdkafka::FFI.rd_kafka_new(