lib/rdkafka/config.rb in rdkafka-0.1.9 vs lib/rdkafka/config.rb in rdkafka-0.1.10
- old
+ new
@@ -1,8 +1,11 @@
require "logger"
module Rdkafka
+ # Configuration for a Kafka consumer or producer. You can create an instance and use
+ # the consumer and producer methods to create a client. Documentation of the available
+ # configuration options is available on https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
class Config
@@logger = Logger.new(STDOUT)
def self.logger
@@logger
@@ -11,13 +14,19 @@
def self.logger=(logger)
@@logger=logger
end
DEFAULT_CONFIG = {
+ # Request api version so advanced features work
:"api.version.request" => true
- }
+ }.freeze
+ REQUIRED_CONFIG = {
+ # Enable log queues so we get callbacks in our own threads
+ :"log.queue" => true
+ }.freeze
+
def initialize(config_hash = {})
@config_hash = DEFAULT_CONFIG.merge(config_hash)
end
def []=(key, value)
@@ -28,11 +37,13 @@
@config_hash[key]
end
def consumer
kafka = native_kafka(native_config, :rd_kafka_consumer)
+ # Redirect the main queue to the consumer
Rdkafka::FFI.rd_kafka_poll_set_consumer(kafka)
+ # Return consumer with Kafka client
Rdkafka::Consumer.new(kafka)
end
def producer
# Create Kafka config
@@ -50,11 +61,11 @@
# This method is only intented to be used to create a client,
# using it in another way will leak memory.
def native_config
Rdkafka::FFI.rd_kafka_conf_new.tap do |config|
- @config_hash.each do |key, value|
+ @config_hash.merge(REQUIRED_CONFIG).each do |key, value|
error_buffer = ::FFI::MemoryPointer.from_string(" " * 256)
result = Rdkafka::FFI.rd_kafka_conf_set(
config,
key.to_s,
value.to_s,
@@ -63,10 +74,11 @@
)
unless result == :config_ok
raise ConfigError.new(error_buffer.read_string)
end
end
+ # Set log callback
Rdkafka::FFI.rd_kafka_conf_set_log_cb(config, Rdkafka::FFI::LogCallback)
end
end
def native_kafka(config, type)
@@ -79,9 +91,15 @@
)
if handle.nil?
raise ClientCreationError.new(error_buffer.read_string)
end
+
+ # Redirect log to handle's queue
+ Rdkafka::FFI.rd_kafka_set_log_queue(
+ handle,
+ Rdkafka::FFI.rd_kafka_queue_get_main(handle)
+ )
::FFI::AutoPointer.new(
handle,
Rdkafka::FFI.method(:rd_kafka_destroy)
)