lib/rdkafka/producer.rb in karafka-rdkafka-0.15.0 vs lib/rdkafka/producer.rb in karafka-rdkafka-0.15.2
- old
+ new
@@ -7,12 +7,20 @@
include Helpers::OAuth
# Cache partitions count for 30 seconds
PARTITIONS_COUNT_TTL = 30
- private_constant :PARTITIONS_COUNT_TTL
+ # Empty hash used as a default
+ EMPTY_HASH = {}.freeze
+ private_constant :PARTITIONS_COUNT_TTL, :EMPTY_HASH
+
+ # Raised when there was a critical issue when invoking rd_kafka_topic_new
+ # This is a temporary solution until https://github.com/karafka/rdkafka-ruby/issues/451 is
+ # resolved and this is normalized in all the places
+ class TopicHandleCreationError < RuntimeError; end
+
# @private
# Returns the current delivery callback, by default this is nil.
#
# @return [Proc, nil]
attr_reader :delivery_callback
@@ -26,10 +34,12 @@
# @private
# @param native_kafka [NativeKafka]
# @param partitioner_name [String, nil] name of the partitioner we want to use or nil to use
# the "consistent_random" default
def initialize(native_kafka, partitioner_name)
+ @topics_refs_map = {}
+ @topics_configs = {}
@native_kafka = native_kafka
@partitioner_name = partitioner_name || "consistent_random"
# Makes sure, that native kafka gets closed before it gets GCed by Ruby
ObjectSpace.define_finalizer(self, native_kafka.finalizer)
@@ -52,10 +62,56 @@
[monotonic_now - PARTITIONS_COUNT_TTL + 5, partition_count]
end
end
end
+ # Sets alternative set of configuration details that can be set per topic
+ # @note It is not allowed to re-set the same topic config twice because of the underlying
+ # librdkafka caching
+ # @param topic [String] The topic name
+ # @param config [Hash] config we want to use per topic basis
+ # @param config_hash [Integer] hash of the config. We expect it here instead of computing it,
+ # because it is already computed during the retrieval attempt in the `#produce` flow.
+ def set_topic_config(topic, config, config_hash)
+ # Ensure lock on topic reference just in case
+ @native_kafka.with_inner do |inner|
+ @topics_refs_map[topic] ||= {}
+ @topics_configs[topic] ||= {}
+
+ return if @topics_configs[topic].key?(config_hash)
+
+ # If config is empty, we create an empty reference that will be used with defaults
+ rd_topic_config = if config.empty?
+ nil
+ else
+ Rdkafka::Bindings.rd_kafka_topic_conf_new.tap do |topic_config|
+ config.each do |key, value|
+ error_buffer = FFI::MemoryPointer.new(:char, 256)
+ result = Rdkafka::Bindings.rd_kafka_topic_conf_set(
+ topic_config,
+ key.to_s,
+ value.to_s,
+ error_buffer,
+ 256
+ )
+
+ unless result == :config_ok
+ raise Config::ConfigError.new(error_buffer.read_string)
+ end
+ end
+ end
+ end
+
+ topic_handle = Bindings.rd_kafka_topic_new(inner, topic, rd_topic_config)
+
+ raise TopicHandleCreationError.new("Error creating topic handle for topic #{topic}") if topic_handle.null?
+
+ @topics_configs[topic][config_hash] = config
+ @topics_refs_map[topic][config_hash] = topic_handle
+ end
+ end
+
# Starts the native Kafka polling thread and kicks off the init polling
# @note Not needed to run unless explicit start was disabled
def start
@native_kafka.start
end
@@ -149,11 +205,22 @@
# Close this producer and wait for the internal poll queue to empty.
def close
return if closed?
ObjectSpace.undefine_finalizer(self)
- @native_kafka.close
+
+ @native_kafka.close do
+ # We need to remove the topics references objects before we destroy the producer,
+ # otherwise they would leak out
+ @topics_refs_map.each_value do |refs|
+ refs.each_value do |ref|
+ Rdkafka::Bindings.rd_kafka_topic_destroy(ref)
+ end
+ end
+ end
+
+ @topics_refs_map.clear
end
# Whether this producer has closed
def closed?
@native_kafka.closed?
@@ -242,15 +309,26 @@
# @param partition [Integer,nil] Optional partition to produce to
# @param partition_key [String, nil] Optional partition key based on which partition assignment can happen
# @param timestamp [Time,Integer,nil] Optional timestamp of this message. Integer timestamp is in milliseconds since Jan 1 1970.
# @param headers [Hash<String,String>] Optional message headers
# @param label [Object, nil] a label that can be assigned when producing a message that will be part of the delivery handle and the delivery report
+ # @param topic_config [Hash] topic config for given message dispatch. Allows to send messages to topics with different configuration
#
# @return [DeliveryHandle] Delivery handle that can be used to wait for the result of producing this message
#
# @raise [RdkafkaError] When adding the message to rdkafka's queue failed
- def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil, label: nil)
+ def produce(
+ topic:,
+ payload: nil,
+ key: nil,
+ partition: nil,
+ partition_key: nil,
+ timestamp: nil,
+ headers: nil,
+ label: nil,
+ topic_config: EMPTY_HASH
+ )
closed_producer_check(__method__)
# Start by checking and converting the input
# Get payload length
@@ -265,12 +343,24 @@
0
else
key.bytesize
end
+ topic_config_hash = topic_config.hash
+
+ # Checks if we have the rdkafka topic reference object ready. It saves us on object
+ # allocation and allows to use custom config on demand.
+ set_topic_config(topic, topic_config, topic_config_hash) unless @topics_refs_map.dig(topic, topic_config_hash)
+ topic_ref = @topics_refs_map.dig(topic, topic_config_hash)
+
if partition_key
partition_count = partition_count(topic)
+
+ # Check if there are no overrides for the partitioner and use the default one only when
+ # no per-topic is present.
+ partitioner_name = @topics_configs.dig(topic, topic_config_hash, :partitioner) || @partitioner_name
+
# If the topic is not present, set to -1
partition = Rdkafka::Bindings.partitioner(partition_key, partition_count, @partitioner_name) if partition_count.positive?
end
# If partition is nil, use -1 to let librdafka set the partition randomly or
@@ -296,10 +386,10 @@
delivery_handle[:partition] = -1
delivery_handle[:offset] = -1
DeliveryHandle.register(delivery_handle)
args = [
- :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_TOPIC, :string, topic,
+ :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_RKT, :pointer, topic_ref,
:int, Rdkafka::Bindings::RD_KAFKA_VTYPE_MSGFLAGS, :int, Rdkafka::Bindings::RD_KAFKA_MSG_F_COPY,
:int, Rdkafka::Bindings::RD_KAFKA_VTYPE_VALUE, :buffer_in, payload, :size_t, payload_size,
:int, Rdkafka::Bindings::RD_KAFKA_VTYPE_KEY, :buffer_in, key, :size_t, key_size,
:int, Rdkafka::Bindings::RD_KAFKA_VTYPE_PARTITION, :int32, partition,
:int, Rdkafka::Bindings::RD_KAFKA_VTYPE_TIMESTAMP, :int64, raw_timestamp,