lib/rdkafka/consumer.rb in karafka-rdkafka-0.12.4 vs lib/rdkafka/consumer.rb in karafka-rdkafka-0.13.0.beta1
- old
+ new
@@ -1,5 +1,7 @@
+# frozen_string_literal: true
+
module Rdkafka
# A consumer of Kafka messages. It uses the high-level consumer approach where the Kafka
# brokers automatically assign partitions and load balance partitions over consumers that
# have the same `:"group.id"` set in their configuration.
#
@@ -12,22 +14,37 @@
include Enumerable
# @private
def initialize(native_kafka)
@native_kafka = native_kafka
- @closing = false
end
+ # @return [String] consumer name
+ def name
+ @name ||= @native_kafka.with_inner do |inner|
+ ::Rdkafka::Bindings.rd_kafka_name(inner)
+ end
+ end
+
+ def finalizer
+ ->(_) { close }
+ end
+
# Close this consumer
# @return [nil]
def close
- return unless @native_kafka
+ return if closed?
+ ObjectSpace.undefine_finalizer(self)
+ @native_kafka.with_inner do |inner|
+ Rdkafka::Bindings.rd_kafka_consumer_close(inner)
+ end
+ @native_kafka.close
+ end
- @closing = true
- Rdkafka::Bindings.rd_kafka_consumer_close(@native_kafka)
- Rdkafka::Bindings.rd_kafka_destroy(@native_kafka)
- @native_kafka = nil
+ # Whether this consumer has closed
+ def closed?
+ @native_kafka.closed?
end
# Subscribe to one or more topics letting Kafka handle partition assignments.
#
# @param topics [Array<String>] One or more topic names
@@ -44,11 +61,13 @@
topics.each do |topic|
Rdkafka::Bindings.rd_kafka_topic_partition_list_add(tpl, topic, -1)
end
# Subscribe to topic partition list and check this was successful
- response = Rdkafka::Bindings.rd_kafka_subscribe(@native_kafka, tpl)
+ response = @native_kafka.with_inner do |inner|
+ Rdkafka::Bindings.rd_kafka_subscribe(inner, tpl)
+ end
if response != 0
raise Rdkafka::RdkafkaError.new(response, "Error subscribing to '#{topics.join(', ')}'")
end
ensure
Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl) unless tpl.nil?
@@ -60,11 +79,13 @@
#
# @return [nil]
def unsubscribe
closed_consumer_check(__method__)
- response = Rdkafka::Bindings.rd_kafka_unsubscribe(@native_kafka)
+ response = @native_kafka.with_inner do |inner|
+ Rdkafka::Bindings.rd_kafka_unsubscribe(inner)
+ end
if response != 0
raise Rdkafka::RdkafkaError.new(response)
end
end
@@ -83,11 +104,13 @@
end
tpl = list.to_native_tpl
begin
- response = Rdkafka::Bindings.rd_kafka_pause_partitions(@native_kafka, tpl)
+ response = @native_kafka.with_inner do |inner|
+ Rdkafka::Bindings.rd_kafka_pause_partitions(inner, tpl)
+ end
if response != 0
list = TopicPartitionList.from_native_tpl(tpl)
raise Rdkafka::RdkafkaTopicPartitionListError.new(response, list, "Error pausing '#{list.to_h}'")
end
@@ -111,11 +134,13 @@
end
tpl = list.to_native_tpl
begin
- response = Rdkafka::Bindings.rd_kafka_resume_partitions(@native_kafka, tpl)
+ response = @native_kafka.with_inner do |inner|
+ Rdkafka::Bindings.rd_kafka_resume_partitions(inner, tpl)
+ end
if response != 0
raise Rdkafka::RdkafkaError.new(response, "Error resume '#{list.to_h}'")
end
ensure
Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl)
@@ -129,11 +154,13 @@
# @return [TopicPartitionList]
def subscription
closed_consumer_check(__method__)
ptr = FFI::MemoryPointer.new(:pointer)
- response = Rdkafka::Bindings.rd_kafka_subscription(@native_kafka, ptr)
+ response = @native_kafka.with_inner do |inner|
+ Rdkafka::Bindings.rd_kafka_subscription(inner, ptr)
+ end
if response != 0
raise Rdkafka::RdkafkaError.new(response)
end
@@ -159,11 +186,13 @@
end
tpl = list.to_native_tpl
begin
- response = Rdkafka::Bindings.rd_kafka_assign(@native_kafka, tpl)
+ response = @native_kafka.with_inner do |inner|
+ Rdkafka::Bindings.rd_kafka_assign(inner, tpl)
+ end
if response != 0
raise Rdkafka::RdkafkaError.new(response, "Error assigning '#{list.to_h}'")
end
ensure
Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl)
@@ -177,11 +206,13 @@
# @return [TopicPartitionList]
def assignment
closed_consumer_check(__method__)
ptr = FFI::MemoryPointer.new(:pointer)
- response = Rdkafka::Bindings.rd_kafka_assignment(@native_kafka, ptr)
+ response = @native_kafka.with_inner do |inner|
+ Rdkafka::Bindings.rd_kafka_assignment(inner, ptr)
+ end
if response != 0
raise Rdkafka::RdkafkaError.new(response)
end
tpl = ptr.read_pointer
@@ -195,10 +226,19 @@
end
ensure
ptr.free unless ptr.nil?
end
+ # @return [Boolean] true if our current assignment has been lost involuntarily.
+ def assignment_lost?
+ closed_consumer_check(__method__)
+
+ @native_kafka.with_inner do |inner|
+ !Rdkafka::Bindings.rd_kafka_assignment_lost(inner).zero?
+ end
+ end
+
# Return the current committed offset per partition for this consumer group.
# The offset field of each requested partition will either be set to stored offset or to -1001 in case there was no stored offset for that partition.
#
# @param list [TopicPartitionList, nil] The topic with partitions to get the offsets for or nil to use the current subscription.
# @param timeout_ms [Integer] The timeout for fetching this information.
@@ -216,11 +256,13 @@
end
tpl = list.to_native_tpl
begin
- response = Rdkafka::Bindings.rd_kafka_committed(@native_kafka, tpl, timeout_ms)
+ response = @native_kafka.with_inner do |inner|
+ Rdkafka::Bindings.rd_kafka_committed(inner, tpl, timeout_ms)
+ end
if response != 0
raise Rdkafka::RdkafkaError.new(response)
end
TopicPartitionList.from_native_tpl(tpl)
ensure
@@ -241,18 +283,20 @@
closed_consumer_check(__method__)
low = FFI::MemoryPointer.new(:int64, 1)
high = FFI::MemoryPointer.new(:int64, 1)
- response = Rdkafka::Bindings.rd_kafka_query_watermark_offsets(
- @native_kafka,
- topic,
- partition,
- low,
- high,
- timeout_ms,
- )
+ response = @native_kafka.with_inner do |inner|
+ Rdkafka::Bindings.rd_kafka_query_watermark_offsets(
+ inner,
+ topic,
+ partition,
+ low,
+ high,
+ timeout_ms,
+ )
+ end
if response != 0
raise Rdkafka::RdkafkaError.new(response, "Error querying watermark offsets for partition #{partition} of #{topic}")
end
return low.read_array_of_int64(1).first, high.read_array_of_int64(1).first
@@ -296,21 +340,25 @@
# Returns the ClusterId as reported in broker metadata.
#
# @return [String, nil]
def cluster_id
closed_consumer_check(__method__)
- Rdkafka::Bindings.rd_kafka_clusterid(@native_kafka)
+ @native_kafka.with_inner do |inner|
+ Rdkafka::Bindings.rd_kafka_clusterid(inner)
+ end
end
# Returns this client's broker-assigned group member id
#
# This currently requires the high-level KafkaConsumer
#
# @return [String, nil]
def member_id
closed_consumer_check(__method__)
- Rdkafka::Bindings.rd_kafka_memberid(@native_kafka)
+ @native_kafka.with_inner do |inner|
+ Rdkafka::Bindings.rd_kafka_memberid(inner)
+ end
end
# Store offset of a message to be used in the next commit of this consumer
#
# When using this `enable.auto.offset.store` should be set to `false` in the config.
@@ -323,15 +371,17 @@
def store_offset(message)
closed_consumer_check(__method__)
# rd_kafka_offset_store is one of the few calls that does not support
# a string as the topic, so create a native topic for it.
- native_topic = Rdkafka::Bindings.rd_kafka_topic_new(
- @native_kafka,
- message.topic,
- nil
- )
+ native_topic = @native_kafka.with_inner do |inner|
+ Rdkafka::Bindings.rd_kafka_topic_new(
+ inner,
+ message.topic,
+ nil
+ )
+ end
response = Rdkafka::Bindings.rd_kafka_offset_store(
native_topic,
message.partition,
message.offset
)
@@ -355,15 +405,17 @@
def seek(message)
closed_consumer_check(__method__)
# rd_kafka_offset_store is one of the few calls that does not support
# a string as the topic, so create a native topic for it.
- native_topic = Rdkafka::Bindings.rd_kafka_topic_new(
- @native_kafka,
- message.topic,
- nil
- )
+ native_topic = @native_kafka.with_inner do |inner|
+ Rdkafka::Bindings.rd_kafka_topic_new(
+ inner,
+ message.topic,
+ nil
+ )
+ end
response = Rdkafka::Bindings.rd_kafka_seek(
native_topic,
message.partition,
message.offset,
0 # timeout
@@ -400,11 +452,13 @@
end
tpl = list ? list.to_native_tpl : nil
begin
- response = Rdkafka::Bindings.rd_kafka_commit(@native_kafka, tpl, async)
+ response = @native_kafka.with_inner do |inner|
+ Rdkafka::Bindings.rd_kafka_commit(inner, tpl, async)
+ end
if response != 0
raise Rdkafka::RdkafkaError.new(response)
end
ensure
Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl) if tpl
@@ -419,11 +473,13 @@
#
# @return [Message, nil] A message or nil if there was no new message within the timeout
def poll(timeout_ms)
closed_consumer_check(__method__)
- message_ptr = Rdkafka::Bindings.rd_kafka_consumer_poll(@native_kafka, timeout_ms)
+ message_ptr = @native_kafka.with_inner do |inner|
+ Rdkafka::Bindings.rd_kafka_consumer_poll(inner, timeout_ms)
+ end
if message_ptr.null?
nil
else
# Create struct wrapper
native_message = Rdkafka::Bindings::Message.new(message_ptr)
@@ -434,11 +490,11 @@
# Create a message to pass out
Rdkafka::Consumer::Message.new(native_message)
end
ensure
# Clean up rdkafka message if there is one
- if !message_ptr.nil? && !message_ptr.null?
+ if message_ptr && !message_ptr.null?
Rdkafka::Bindings.rd_kafka_message_destroy(message_ptr)
end
end
# Poll for new messages and yield for each received one. Iteration
@@ -457,23 +513,19 @@
loop do
message = poll(250)
if message
yield(message)
else
- if @closing
+ if closed?
break
else
next
end
end
end
end
- def closed_consumer_check(method)
- raise Rdkafka::ClosedConsumerError.new(method) if @native_kafka.nil?
- end
-
# Poll for new messages and yield them in batches that may contain
# messages from more than one partition.
#
# Rather than yield each message immediately as soon as it is received,
# each_batch will attempt to wait for as long as `timeout_ms` in order
@@ -525,11 +577,11 @@
closed_consumer_check(__method__)
slice = []
bytes = 0
end_time = monotonic_now + timeout_ms / 1000.0
loop do
- break if @closing
+ break if closed?
max_wait = end_time - monotonic_now
max_wait_ms = if max_wait <= 0
0 # should not block, but may retrieve a message
else
(max_wait * 1000).floor
@@ -543,11 +595,11 @@
yield slice.dup, error
raise
end
if message
slice << message
- bytes += message.payload.bytesize
+ bytes += message.payload.bytesize if message.payload
end
if slice.size == max_items || bytes >= bytes_threshold || monotonic_now >= end_time - 0.001
yield slice.dup, nil
slice.clear
bytes = 0
@@ -558,8 +610,12 @@
private
def monotonic_now
# needed because Time.now can go backwards
Process.clock_gettime(Process::CLOCK_MONOTONIC)
+ end
+
+ def closed_consumer_check(method)
+ raise Rdkafka::ClosedConsumerError.new(method) if closed?
end
end
end