lib/rdkafka/config.rb in rdkafka-0.4.1 vs lib/rdkafka/config.rb in rdkafka-0.4.2

- old
+ new

@@ -5,11 +5,14 @@ # the consumer and producer methods to create a client. Documentation of the available # configuration options is available on https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md. class Config # @private @@logger = Logger.new(STDOUT) + # @private @@statistics_callback = nil + # @private + @@opaques = {} # Returns the current logger, by default this is a logger to stdout. # # @return [Logger] def self.logger @@ -43,10 +46,15 @@ # @return [Proc, nil] def self.statistics_callback @@statistics_callback end + # @private + def self.opaques + @@opaques + end + # Default config that can be overwritten. DEFAULT_CONFIG = { # Request api version so advanced features work :"api.version.request" => true }.freeze @@ -104,16 +112,20 @@ # @raise [ConfigError] When the configuration contains invalid options # @raise [ClientCreationError] When the native client cannot be created # # @return [Producer] The created producer def producer + # Create opaque + opaque = Opaque.new # Create Kafka config - config = native_config + config = native_config(opaque) # Set callback to receive delivery reports on config Rdkafka::Bindings.rd_kafka_conf_set_dr_msg_cb(config, Rdkafka::Bindings::DeliveryCallback) # Return producer with Kafka client - Rdkafka::Producer.new(native_kafka(config, :rd_kafka_producer)) + Rdkafka::Producer.new(native_kafka(config, :rd_kafka_producer)).tap do |producer| + opaque.producer = producer + end end # Error that is returned by the underlying rdkafka error if an invalid configuration option is present. class ConfigError < RuntimeError; end @@ -125,12 +137,13 @@ private # This method is only intented to be used to create a client, # using it in another way will leak memory. - def native_config + def native_config(opaque=nil) Rdkafka::Bindings.rd_kafka_conf_new.tap do |config| + # Create config @config_hash.merge(REQUIRED_CONFIG).each do |key, value| error_buffer = FFI::MemoryPointer.from_string(" " * 256) result = Rdkafka::Bindings.rd_kafka_conf_set( config, key.to_s, @@ -140,14 +153,25 @@ ) unless result == :config_ok raise ConfigError.new(error_buffer.read_string) end end - # Set opaque pointer back to this config - #Rdkafka::Bindings.rd_kafka_conf_set_opaque(config, self) + + # Set opaque pointer that's used as a proxy for callbacks + if opaque + pointer = ::FFI::Pointer.new(:pointer, opaque.object_id) + Rdkafka::Bindings.rd_kafka_conf_set_opaque(config, pointer) + + # Store opaque with the pointer as key. We use this approach instead + # of trying to convert the pointer to a Ruby object because there is + # no risk of a segfault this way. + Rdkafka::Config.opaques[pointer.to_i] = opaque + end + # Set log callback Rdkafka::Bindings.rd_kafka_conf_set_log_cb(config, Rdkafka::Bindings::LogCallback) + # Set stats callback Rdkafka::Bindings.rd_kafka_conf_set_stats_cb(config, Rdkafka::Bindings::StatsCallback) end end @@ -172,8 +196,17 @@ FFI::AutoPointer.new( handle, Rdkafka::Bindings.method(:rd_kafka_destroy) ) + end + end + + # @private + class Opaque + attr_accessor :producer + + def call_delivery_callback(delivery_handle) + producer.call_delivery_callback(delivery_handle) if producer end end end