lib/rdkafka/producer.rb in karafka-rdkafka-0.12.4 vs lib/rdkafka/producer.rb in karafka-rdkafka-0.13.0.beta1
- old
+ new
@@ -1,10 +1,17 @@
+# frozen_string_literal: true
+
require "objspace"
module Rdkafka
# A producer for Kafka messages. To create a producer set up a {Config} and call {Config#producer producer} on that.
class Producer
+ # Cache partitions count for 30 seconds
+ PARTITIONS_COUNT_TTL = 30
+
+ private_constant :PARTITIONS_COUNT_TTL
+
# @private
# Returns the current delivery callback, by default this is nil.
#
# @return [Proc, nil]
attr_reader :delivery_callback
@@ -14,18 +21,38 @@
#
# @return [Integer, nil]
attr_reader :delivery_callback_arity
# @private
- def initialize(client, partitioner_name)
- @client = client
+ def initialize(native_kafka, partitioner_name)
+ @native_kafka = native_kafka
@partitioner_name = partitioner_name || "consistent_random"
- # Makes sure, that the producer gets closed before it gets GCed by Ruby
- ObjectSpace.define_finalizer(self, client.finalizer)
+ # Makes sure, that native kafka gets closed before it gets GCed by Ruby
+ ObjectSpace.define_finalizer(self, native_kafka.finalizer)
+
+ @_partitions_count_cache = Hash.new do |cache, topic|
+ topic_metadata = nil
+
+ @native_kafka.with_inner do |inner|
+ topic_metadata = ::Rdkafka::Metadata.new(inner, topic).topics&.first
+ end
+
+ cache[topic] = [
+ monotonic_now,
+ topic_metadata ? topic_metadata[:partition_count] : nil
+ ]
+ end
end
+ # @return [String] producer name
+ def name
+ @name ||= @native_kafka.with_inner do |inner|
+ ::Rdkafka::Bindings.rd_kafka_name(inner)
+ end
+ end
+
# Set a callback that will be called every time a message is successfully produced.
# The callback is called with a {DeliveryReport} and {DeliveryHandle}
#
# @param callback [Proc, #call] The callback
#
@@ -36,25 +63,53 @@
@delivery_callback_arity = arity(callback)
end
# Close this producer and wait for the internal poll queue to empty.
def close
+ return if closed?
ObjectSpace.undefine_finalizer(self)
+ @native_kafka.close
+ end
- @client.close
+ # Whether this producer has closed
+ def closed?
+ @native_kafka.closed?
end
+ # Wait until all outstanding producer requests are completed, with the given timeout
+ # in seconds. Call this before closing a producer to ensure delivery of all messages.
+ #
+ # @param timeout_ms [Integer] how long should we wait for flush of all messages
+ def flush(timeout_ms=5_000)
+ closed_producer_check(__method__)
+
+ @native_kafka.with_inner do |inner|
+ Rdkafka::Bindings.rd_kafka_flush(inner, timeout_ms)
+ end
+ end
+
# Partition count for a given topic.
# NOTE: If 'allow.auto.create.topics' is set to true in the broker, the topic will be auto-created after returning nil.
#
# @param topic [String] The topic name.
#
# @return partition count [Integer,nil]
#
+ # We cache the partition count for a given topic for given time
+ # This prevents us in case someone uses `partition_key` from querying for the count with
+ # each message. Instead we query once every 30 seconds at most
+ #
+ # @param topic [String] topic name
+ # @return [Integer] partition count for a given topic
def partition_count(topic)
closed_producer_check(__method__)
- Rdkafka::Metadata.new(@client.native, topic).topics&.first[:partition_count]
+
+ @_partitions_count_cache.delete_if do |_, cached|
+ monotonic_now - cached.first > PARTITIONS_COUNT_TTL
+ end
+
+ @_partitions_count_cache[topic].last
end
# Produces a message to a Kafka topic. The message is added to rdkafka's queue, call {DeliveryHandle#wait wait} on the returned delivery handle to make sure it is delivered.
#
# When no partition is specified the underlying Kafka library picks a partition based on the key. If no key is specified, a random partition will be used.
@@ -141,25 +196,26 @@
end
args << :int << Rdkafka::Bindings::RD_KAFKA_VTYPE_END
# Produce the message
- response = Rdkafka::Bindings.rd_kafka_producev(
- @client.native,
- *args
- )
+ response = @native_kafka.with_inner do |inner|
+ Rdkafka::Bindings.rd_kafka_producev(
+ inner,
+ *args
+ )
+ end
# Raise error if the produce call was not successful
if response != 0
DeliveryHandle.remove(delivery_handle.to_ptr.address)
raise RdkafkaError.new(response)
end
delivery_handle
end
- # @private
def call_delivery_callback(delivery_report, delivery_handle)
return unless @delivery_callback
args = [delivery_report, delivery_handle].take(@delivery_callback_arity)
@delivery_callback.call(*args)
@@ -169,10 +225,17 @@
return callback.arity if callback.respond_to?(:arity)
callback.method(:call).arity
end
+ private
+
+ def monotonic_now
+ # needed because Time.now can go backwards
+ Process.clock_gettime(Process::CLOCK_MONOTONIC)
+ end
+
def closed_producer_check(method)
- raise Rdkafka::ClosedProducerError.new(method) if @client.closed?
+ raise Rdkafka::ClosedProducerError.new(method) if closed?
end
end
end