lib/rdkafka/producer.rb in karafka-rdkafka-0.12.4 vs lib/rdkafka/producer.rb in karafka-rdkafka-0.13.0.beta1

- old
+ new

@@ -1,10 +1,17 @@ +# frozen_string_literal: true + require "objspace" module Rdkafka # A producer for Kafka messages. To create a producer set up a {Config} and call {Config#producer producer} on that. class Producer + # Cache partitions count for 30 seconds + PARTITIONS_COUNT_TTL = 30 + + private_constant :PARTITIONS_COUNT_TTL + # @private # Returns the current delivery callback, by default this is nil. # # @return [Proc, nil] attr_reader :delivery_callback @@ -14,18 +21,38 @@ # # @return [Integer, nil] attr_reader :delivery_callback_arity # @private - def initialize(client, partitioner_name) - @client = client + def initialize(native_kafka, partitioner_name) + @native_kafka = native_kafka @partitioner_name = partitioner_name || "consistent_random" - # Makes sure, that the producer gets closed before it gets GCed by Ruby - ObjectSpace.define_finalizer(self, client.finalizer) + # Makes sure, that native kafka gets closed before it gets GCed by Ruby + ObjectSpace.define_finalizer(self, native_kafka.finalizer) + + @_partitions_count_cache = Hash.new do |cache, topic| + topic_metadata = nil + + @native_kafka.with_inner do |inner| + topic_metadata = ::Rdkafka::Metadata.new(inner, topic).topics&.first + end + + cache[topic] = [ + monotonic_now, + topic_metadata ? topic_metadata[:partition_count] : nil + ] + end end + # @return [String] producer name + def name + @name ||= @native_kafka.with_inner do |inner| + ::Rdkafka::Bindings.rd_kafka_name(inner) + end + end + # Set a callback that will be called every time a message is successfully produced. # The callback is called with a {DeliveryReport} and {DeliveryHandle} # # @param callback [Proc, #call] The callback # @@ -36,25 +63,53 @@ @delivery_callback_arity = arity(callback) end # Close this producer and wait for the internal poll queue to empty. def close + return if closed? ObjectSpace.undefine_finalizer(self) + @native_kafka.close + end - @client.close + # Whether this producer has closed + def closed? + @native_kafka.closed? end + # Wait until all outstanding producer requests are completed, with the given timeout + # in seconds. Call this before closing a producer to ensure delivery of all messages. + # + # @param timeout_ms [Integer] how long should we wait for flush of all messages + def flush(timeout_ms=5_000) + closed_producer_check(__method__) + + @native_kafka.with_inner do |inner| + Rdkafka::Bindings.rd_kafka_flush(inner, timeout_ms) + end + end + # Partition count for a given topic. # NOTE: If 'allow.auto.create.topics' is set to true in the broker, the topic will be auto-created after returning nil. # # @param topic [String] The topic name. # # @return partition count [Integer,nil] # + # We cache the partition count for a given topic for given time + # This prevents us in case someone uses `partition_key` from querying for the count with + # each message. Instead we query once every 30 seconds at most + # + # @param topic [String] topic name + # @return [Integer] partition count for a given topic def partition_count(topic) closed_producer_check(__method__) - Rdkafka::Metadata.new(@client.native, topic).topics&.first[:partition_count] + + @_partitions_count_cache.delete_if do |_, cached| + monotonic_now - cached.first > PARTITIONS_COUNT_TTL + end + + @_partitions_count_cache[topic].last end # Produces a message to a Kafka topic. The message is added to rdkafka's queue, call {DeliveryHandle#wait wait} on the returned delivery handle to make sure it is delivered. # # When no partition is specified the underlying Kafka library picks a partition based on the key. If no key is specified, a random partition will be used. @@ -141,25 +196,26 @@ end args << :int << Rdkafka::Bindings::RD_KAFKA_VTYPE_END # Produce the message - response = Rdkafka::Bindings.rd_kafka_producev( - @client.native, - *args - ) + response = @native_kafka.with_inner do |inner| + Rdkafka::Bindings.rd_kafka_producev( + inner, + *args + ) + end # Raise error if the produce call was not successful if response != 0 DeliveryHandle.remove(delivery_handle.to_ptr.address) raise RdkafkaError.new(response) end delivery_handle end - # @private def call_delivery_callback(delivery_report, delivery_handle) return unless @delivery_callback args = [delivery_report, delivery_handle].take(@delivery_callback_arity) @delivery_callback.call(*args) @@ -169,10 +225,17 @@ return callback.arity if callback.respond_to?(:arity) callback.method(:call).arity end + private + + def monotonic_now + # needed because Time.now can go backwards + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + def closed_producer_check(method) - raise Rdkafka::ClosedProducerError.new(method) if @client.closed? + raise Rdkafka::ClosedProducerError.new(method) if closed? end end end