lib/rdkafka/config.rb in rdkafka-0.4.1 vs lib/rdkafka/config.rb in rdkafka-0.4.2
- old
+ new
@@ -5,11 +5,14 @@
# the consumer and producer methods to create a client. Documentation of the available
# configuration options is available on https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
class Config
# @private
@@logger = Logger.new(STDOUT)
+ # @private
@@statistics_callback = nil
+ # @private
+ @@opaques = {}
# Returns the current logger, by default this is a logger to stdout.
#
# @return [Logger]
def self.logger
@@ -43,10 +46,15 @@
# @return [Proc, nil]
def self.statistics_callback
@@statistics_callback
end
+ # @private
+ def self.opaques
+ @@opaques
+ end
+
# Default config that can be overwritten.
DEFAULT_CONFIG = {
# Request api version so advanced features work
:"api.version.request" => true
}.freeze
@@ -104,16 +112,20 @@
# @raise [ConfigError] When the configuration contains invalid options
# @raise [ClientCreationError] When the native client cannot be created
#
# @return [Producer] The created producer
def producer
+ # Create opaque
+ opaque = Opaque.new
# Create Kafka config
- config = native_config
+ config = native_config(opaque)
# Set callback to receive delivery reports on config
Rdkafka::Bindings.rd_kafka_conf_set_dr_msg_cb(config, Rdkafka::Bindings::DeliveryCallback)
# Return producer with Kafka client
- Rdkafka::Producer.new(native_kafka(config, :rd_kafka_producer))
+ Rdkafka::Producer.new(native_kafka(config, :rd_kafka_producer)).tap do |producer|
+ opaque.producer = producer
+ end
end
# Error that is returned by the underlying rdkafka error if an invalid configuration option is present.
class ConfigError < RuntimeError; end
@@ -125,12 +137,13 @@
private
# This method is only intented to be used to create a client,
# using it in another way will leak memory.
- def native_config
+ def native_config(opaque=nil)
Rdkafka::Bindings.rd_kafka_conf_new.tap do |config|
+ # Create config
@config_hash.merge(REQUIRED_CONFIG).each do |key, value|
error_buffer = FFI::MemoryPointer.from_string(" " * 256)
result = Rdkafka::Bindings.rd_kafka_conf_set(
config,
key.to_s,
@@ -140,14 +153,25 @@
)
unless result == :config_ok
raise ConfigError.new(error_buffer.read_string)
end
end
- # Set opaque pointer back to this config
- #Rdkafka::Bindings.rd_kafka_conf_set_opaque(config, self)
+
+ # Set opaque pointer that's used as a proxy for callbacks
+ if opaque
+ pointer = ::FFI::Pointer.new(:pointer, opaque.object_id)
+ Rdkafka::Bindings.rd_kafka_conf_set_opaque(config, pointer)
+
+ # Store opaque with the pointer as key. We use this approach instead
+ # of trying to convert the pointer to a Ruby object because there is
+ # no risk of a segfault this way.
+ Rdkafka::Config.opaques[pointer.to_i] = opaque
+ end
+
# Set log callback
Rdkafka::Bindings.rd_kafka_conf_set_log_cb(config, Rdkafka::Bindings::LogCallback)
+
# Set stats callback
Rdkafka::Bindings.rd_kafka_conf_set_stats_cb(config, Rdkafka::Bindings::StatsCallback)
end
end
@@ -172,8 +196,17 @@
FFI::AutoPointer.new(
handle,
Rdkafka::Bindings.method(:rd_kafka_destroy)
)
+ end
+ end
+
+ # @private
+ class Opaque
+ attr_accessor :producer
+
+ def call_delivery_callback(delivery_handle)
+ producer.call_delivery_callback(delivery_handle) if producer
end
end
end