lib/rdkafka/config.rb in rdkafka-0.1.9 vs lib/rdkafka/config.rb in rdkafka-0.1.10

- old
+ new

@@ -1,8 +1,11 @@ require "logger" module Rdkafka + # Configuration for a Kafka consumer or producer. You can create an instance and use + # the consumer and producer methods to create a client. Documentation of the available + # configuration options is available on https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md. class Config @@logger = Logger.new(STDOUT) def self.logger @@logger @@ -11,13 +14,19 @@ def self.logger=(logger) @@logger=logger end DEFAULT_CONFIG = { + # Request api version so advanced features work :"api.version.request" => true - } + }.freeze + REQUIRED_CONFIG = { + # Enable log queues so we get callbacks in our own threads + :"log.queue" => true + }.freeze + def initialize(config_hash = {}) @config_hash = DEFAULT_CONFIG.merge(config_hash) end def []=(key, value) @@ -28,11 +37,13 @@ @config_hash[key] end def consumer kafka = native_kafka(native_config, :rd_kafka_consumer) + # Redirect the main queue to the consumer Rdkafka::FFI.rd_kafka_poll_set_consumer(kafka) + # Return consumer with Kafka client Rdkafka::Consumer.new(kafka) end def producer # Create Kafka config @@ -50,11 +61,11 @@ # This method is only intented to be used to create a client, # using it in another way will leak memory. def native_config Rdkafka::FFI.rd_kafka_conf_new.tap do |config| - @config_hash.each do |key, value| + @config_hash.merge(REQUIRED_CONFIG).each do |key, value| error_buffer = ::FFI::MemoryPointer.from_string(" " * 256) result = Rdkafka::FFI.rd_kafka_conf_set( config, key.to_s, value.to_s, @@ -63,10 +74,11 @@ ) unless result == :config_ok raise ConfigError.new(error_buffer.read_string) end end + # Set log callback Rdkafka::FFI.rd_kafka_conf_set_log_cb(config, Rdkafka::FFI::LogCallback) end end def native_kafka(config, type) @@ -79,9 +91,15 @@ ) if handle.nil? raise ClientCreationError.new(error_buffer.read_string) end + + # Redirect log to handle's queue + Rdkafka::FFI.rd_kafka_set_log_queue( + handle, + Rdkafka::FFI.rd_kafka_queue_get_main(handle) + ) ::FFI::AutoPointer.new( handle, Rdkafka::FFI.method(:rd_kafka_destroy) )