lib/rdkafka/config.rb in rdkafka-0.12.1 vs lib/rdkafka/config.rb in rdkafka-0.13.0.beta.1

- old
+ new

@@ -1,5 +1,7 @@ +# frozen_string_literal: true + require "logger" module Rdkafka # Configuration for a Kafka consumer or producer. You can create an instance and use # the consumer and producer methods to create a client. Documentation of the available @@ -28,11 +30,10 @@ # @return [Logger] def self.logger @@logger end - # Returns a queue whose contents will be passed to the configured logger. Each entry # should follow the format [Logger::Severity, String]. The benefit over calling the # logger directly is that this is safe to use from trap contexts. # # @return [Queue] @@ -45,11 +46,11 @@ # @param logger [Logger] The logger to be used # # @return [nil] def self.logger=(logger) raise NoLoggerError if logger.nil? - @@logger=logger + @@logger = logger end # Set a callback that will be called every time the underlying client emits statistics. # You can configure if and how often this happens using `statistics.interval.ms`. # The callback is called with a hash that's documented here: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md @@ -177,11 +178,12 @@ # Create Kafka config config = native_config(opaque) # Set callback to receive delivery reports on config Rdkafka::Bindings.rd_kafka_conf_set_dr_msg_cb(config, Rdkafka::Callbacks::DeliveryCallbackFunction) # Return producer with Kafka client - Rdkafka::Producer.new(Rdkafka::Producer::Client.new(native_kafka(config, :rd_kafka_producer)), self[:partitioner]).tap do |producer| + partitioner_name = self[:partitioner] || self["partitioner"] + Rdkafka::Producer.new(Rdkafka::NativeKafka.new(native_kafka(config, :rd_kafka_producer)), partitioner_name).tap do |producer| opaque.producer = producer end end # Create an admin instance with this configuration. @@ -192,11 +194,11 @@ # @return [Admin] The created admin instance def admin opaque = Opaque.new config = native_config(opaque) Rdkafka::Bindings.rd_kafka_conf_set_background_event_cb(config, Rdkafka::Callbacks::BackgroundEventCallbackFunction) - Rdkafka::Admin.new(native_kafka(config, :rd_kafka_producer)) + Rdkafka::Admin.new(Rdkafka::NativeKafka.new(native_kafka(config, :rd_kafka_producer))) end # Error that is returned by the underlying rdkafka error if an invalid configuration option is present. class ConfigError < RuntimeError; end @@ -208,10 +210,10 @@ private # This method is only intended to be used to create a client, # using it in another way will leak memory. - def native_config(opaque=nil) + def native_config(opaque = nil) Rdkafka::Bindings.rd_kafka_conf_new.tap do |config| # Create config @config_hash.merge(REQUIRED_CONFIG).each do |key, value| error_buffer = FFI::MemoryPointer.from_string(" " * 256) result = Rdkafka::Bindings.rd_kafka_conf_set(