lib/rdkafka/config.rb in rdkafka-0.12.1 vs lib/rdkafka/config.rb in rdkafka-0.13.0.beta.1
- old
+ new
@@ -1,5 +1,7 @@
+# frozen_string_literal: true
+
require "logger"
module Rdkafka
# Configuration for a Kafka consumer or producer. You can create an instance and use
# the consumer and producer methods to create a client. Documentation of the available
@@ -28,11 +30,10 @@
# @return [Logger]
def self.logger
@@logger
end
-
# Returns a queue whose contents will be passed to the configured logger. Each entry
# should follow the format [Logger::Severity, String]. The benefit over calling the
# logger directly is that this is safe to use from trap contexts.
#
# @return [Queue]
@@ -45,11 +46,11 @@
# @param logger [Logger] The logger to be used
#
# @return [nil]
def self.logger=(logger)
raise NoLoggerError if logger.nil?
- @@logger=logger
+ @@logger = logger
end
# Set a callback that will be called every time the underlying client emits statistics.
# You can configure if and how often this happens using `statistics.interval.ms`.
# The callback is called with a hash that's documented here: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md
@@ -177,11 +178,12 @@
# Create Kafka config
config = native_config(opaque)
# Set callback to receive delivery reports on config
Rdkafka::Bindings.rd_kafka_conf_set_dr_msg_cb(config, Rdkafka::Callbacks::DeliveryCallbackFunction)
# Return producer with Kafka client
- Rdkafka::Producer.new(Rdkafka::Producer::Client.new(native_kafka(config, :rd_kafka_producer)), self[:partitioner]).tap do |producer|
+ partitioner_name = self[:partitioner] || self["partitioner"]
+ Rdkafka::Producer.new(Rdkafka::NativeKafka.new(native_kafka(config, :rd_kafka_producer)), partitioner_name).tap do |producer|
opaque.producer = producer
end
end
# Create an admin instance with this configuration.
@@ -192,11 +194,11 @@
# @return [Admin] The created admin instance
def admin
opaque = Opaque.new
config = native_config(opaque)
Rdkafka::Bindings.rd_kafka_conf_set_background_event_cb(config, Rdkafka::Callbacks::BackgroundEventCallbackFunction)
- Rdkafka::Admin.new(native_kafka(config, :rd_kafka_producer))
+ Rdkafka::Admin.new(Rdkafka::NativeKafka.new(native_kafka(config, :rd_kafka_producer)))
end
# Error that is returned by the underlying rdkafka error if an invalid configuration option is present.
class ConfigError < RuntimeError; end
@@ -208,10 +210,10 @@
private
# This method is only intended to be used to create a client,
# using it in another way will leak memory.
- def native_config(opaque=nil)
+ def native_config(opaque = nil)
Rdkafka::Bindings.rd_kafka_conf_new.tap do |config|
# Create config
@config_hash.merge(REQUIRED_CONFIG).each do |key, value|
error_buffer = FFI::MemoryPointer.from_string(" " * 256)
result = Rdkafka::Bindings.rd_kafka_conf_set(