lib/rdkafka/producer.rb in karafka-rdkafka-0.15.0 vs lib/rdkafka/producer.rb in karafka-rdkafka-0.15.2

- old
+ new

@@ -7,12 +7,20 @@ include Helpers::OAuth # Cache partitions count for 30 seconds PARTITIONS_COUNT_TTL = 30 - private_constant :PARTITIONS_COUNT_TTL + # Empty hash used as a default + EMPTY_HASH = {}.freeze + private_constant :PARTITIONS_COUNT_TTL, :EMPTY_HASH + + # Raised when there was a critical issue when invoking rd_kafka_topic_new + # This is a temporary solution until https://github.com/karafka/rdkafka-ruby/issues/451 is + # resolved and this is normalized in all the places + class TopicHandleCreationError < RuntimeError; end + # @private # Returns the current delivery callback, by default this is nil. # # @return [Proc, nil] attr_reader :delivery_callback @@ -26,10 +34,12 @@ # @private # @param native_kafka [NativeKafka] # @param partitioner_name [String, nil] name of the partitioner we want to use or nil to use # the "consistent_random" default def initialize(native_kafka, partitioner_name) + @topics_refs_map = {} + @topics_configs = {} @native_kafka = native_kafka @partitioner_name = partitioner_name || "consistent_random" # Makes sure, that native kafka gets closed before it gets GCed by Ruby ObjectSpace.define_finalizer(self, native_kafka.finalizer) @@ -52,10 +62,56 @@ [monotonic_now - PARTITIONS_COUNT_TTL + 5, partition_count] end end end + # Sets alternative set of configuration details that can be set per topic + # @note It is not allowed to re-set the same topic config twice because of the underlying + # librdkafka caching + # @param topic [String] The topic name + # @param config [Hash] config we want to use per topic basis + # @param config_hash [Integer] hash of the config. We expect it here instead of computing it, + # because it is already computed during the retrieval attempt in the `#produce` flow. + def set_topic_config(topic, config, config_hash) + # Ensure lock on topic reference just in case + @native_kafka.with_inner do |inner| + @topics_refs_map[topic] ||= {} + @topics_configs[topic] ||= {} + + return if @topics_configs[topic].key?(config_hash) + + # If config is empty, we create an empty reference that will be used with defaults + rd_topic_config = if config.empty? + nil + else + Rdkafka::Bindings.rd_kafka_topic_conf_new.tap do |topic_config| + config.each do |key, value| + error_buffer = FFI::MemoryPointer.new(:char, 256) + result = Rdkafka::Bindings.rd_kafka_topic_conf_set( + topic_config, + key.to_s, + value.to_s, + error_buffer, + 256 + ) + + unless result == :config_ok + raise Config::ConfigError.new(error_buffer.read_string) + end + end + end + end + + topic_handle = Bindings.rd_kafka_topic_new(inner, topic, rd_topic_config) + + raise TopicHandleCreationError.new("Error creating topic handle for topic #{topic}") if topic_handle.null? + + @topics_configs[topic][config_hash] = config + @topics_refs_map[topic][config_hash] = topic_handle + end + end + # Starts the native Kafka polling thread and kicks off the init polling # @note Not needed to run unless explicit start was disabled def start @native_kafka.start end @@ -149,11 +205,22 @@ # Close this producer and wait for the internal poll queue to empty. def close return if closed? ObjectSpace.undefine_finalizer(self) - @native_kafka.close + + @native_kafka.close do + # We need to remove the topics references objects before we destroy the producer, + # otherwise they would leak out + @topics_refs_map.each_value do |refs| + refs.each_value do |ref| + Rdkafka::Bindings.rd_kafka_topic_destroy(ref) + end + end + end + + @topics_refs_map.clear end # Whether this producer has closed def closed? @native_kafka.closed? @@ -242,15 +309,26 @@ # @param partition [Integer,nil] Optional partition to produce to # @param partition_key [String, nil] Optional partition key based on which partition assignment can happen # @param timestamp [Time,Integer,nil] Optional timestamp of this message. Integer timestamp is in milliseconds since Jan 1 1970. # @param headers [Hash<String,String>] Optional message headers # @param label [Object, nil] a label that can be assigned when producing a message that will be part of the delivery handle and the delivery report + # @param topic_config [Hash] topic config for given message dispatch. Allows to send messages to topics with different configuration # # @return [DeliveryHandle] Delivery handle that can be used to wait for the result of producing this message # # @raise [RdkafkaError] When adding the message to rdkafka's queue failed - def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil, label: nil) + def produce( + topic:, + payload: nil, + key: nil, + partition: nil, + partition_key: nil, + timestamp: nil, + headers: nil, + label: nil, + topic_config: EMPTY_HASH + ) closed_producer_check(__method__) # Start by checking and converting the input # Get payload length @@ -265,12 +343,24 @@ 0 else key.bytesize end + topic_config_hash = topic_config.hash + + # Checks if we have the rdkafka topic reference object ready. It saves us on object + # allocation and allows to use custom config on demand. + set_topic_config(topic, topic_config, topic_config_hash) unless @topics_refs_map.dig(topic, topic_config_hash) + topic_ref = @topics_refs_map.dig(topic, topic_config_hash) + if partition_key partition_count = partition_count(topic) + + # Check if there are no overrides for the partitioner and use the default one only when + # no per-topic is present. + partitioner_name = @topics_configs.dig(topic, topic_config_hash, :partitioner) || @partitioner_name + # If the topic is not present, set to -1 partition = Rdkafka::Bindings.partitioner(partition_key, partition_count, @partitioner_name) if partition_count.positive? end # If partition is nil, use -1 to let librdafka set the partition randomly or @@ -296,10 +386,10 @@ delivery_handle[:partition] = -1 delivery_handle[:offset] = -1 DeliveryHandle.register(delivery_handle) args = [ - :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_TOPIC, :string, topic, + :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_RKT, :pointer, topic_ref, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_MSGFLAGS, :int, Rdkafka::Bindings::RD_KAFKA_MSG_F_COPY, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_VALUE, :buffer_in, payload, :size_t, payload_size, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_KEY, :buffer_in, key, :size_t, key_size, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_PARTITION, :int32, partition, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_TIMESTAMP, :int64, raw_timestamp,