lib/rdkafka/consumer.rb in karafka-rdkafka-0.12.4 vs lib/rdkafka/consumer.rb in karafka-rdkafka-0.13.0.beta1

- old
+ new

@@ -1,5 +1,7 @@ +# frozen_string_literal: true + module Rdkafka # A consumer of Kafka messages. It uses the high-level consumer approach where the Kafka # brokers automatically assign partitions and load balance partitions over consumers that # have the same `:"group.id"` set in their configuration. # @@ -12,22 +14,37 @@ include Enumerable # @private def initialize(native_kafka) @native_kafka = native_kafka - @closing = false end + # @return [String] consumer name + def name + @name ||= @native_kafka.with_inner do |inner| + ::Rdkafka::Bindings.rd_kafka_name(inner) + end + end + + def finalizer + ->(_) { close } + end + # Close this consumer # @return [nil] def close - return unless @native_kafka + return if closed? + ObjectSpace.undefine_finalizer(self) + @native_kafka.with_inner do |inner| + Rdkafka::Bindings.rd_kafka_consumer_close(inner) + end + @native_kafka.close + end - @closing = true - Rdkafka::Bindings.rd_kafka_consumer_close(@native_kafka) - Rdkafka::Bindings.rd_kafka_destroy(@native_kafka) - @native_kafka = nil + # Whether this consumer has closed + def closed? + @native_kafka.closed? end # Subscribe to one or more topics letting Kafka handle partition assignments. # # @param topics [Array<String>] One or more topic names @@ -44,11 +61,13 @@ topics.each do |topic| Rdkafka::Bindings.rd_kafka_topic_partition_list_add(tpl, topic, -1) end # Subscribe to topic partition list and check this was successful - response = Rdkafka::Bindings.rd_kafka_subscribe(@native_kafka, tpl) + response = @native_kafka.with_inner do |inner| + Rdkafka::Bindings.rd_kafka_subscribe(inner, tpl) + end if response != 0 raise Rdkafka::RdkafkaError.new(response, "Error subscribing to '#{topics.join(', ')}'") end ensure Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl) unless tpl.nil? @@ -60,11 +79,13 @@ # # @return [nil] def unsubscribe closed_consumer_check(__method__) - response = Rdkafka::Bindings.rd_kafka_unsubscribe(@native_kafka) + response = @native_kafka.with_inner do |inner| + Rdkafka::Bindings.rd_kafka_unsubscribe(inner) + end if response != 0 raise Rdkafka::RdkafkaError.new(response) end end @@ -83,11 +104,13 @@ end tpl = list.to_native_tpl begin - response = Rdkafka::Bindings.rd_kafka_pause_partitions(@native_kafka, tpl) + response = @native_kafka.with_inner do |inner| + Rdkafka::Bindings.rd_kafka_pause_partitions(inner, tpl) + end if response != 0 list = TopicPartitionList.from_native_tpl(tpl) raise Rdkafka::RdkafkaTopicPartitionListError.new(response, list, "Error pausing '#{list.to_h}'") end @@ -111,11 +134,13 @@ end tpl = list.to_native_tpl begin - response = Rdkafka::Bindings.rd_kafka_resume_partitions(@native_kafka, tpl) + response = @native_kafka.with_inner do |inner| + Rdkafka::Bindings.rd_kafka_resume_partitions(inner, tpl) + end if response != 0 raise Rdkafka::RdkafkaError.new(response, "Error resume '#{list.to_h}'") end ensure Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl) @@ -129,11 +154,13 @@ # @return [TopicPartitionList] def subscription closed_consumer_check(__method__) ptr = FFI::MemoryPointer.new(:pointer) - response = Rdkafka::Bindings.rd_kafka_subscription(@native_kafka, ptr) + response = @native_kafka.with_inner do |inner| + Rdkafka::Bindings.rd_kafka_subscription(inner, ptr) + end if response != 0 raise Rdkafka::RdkafkaError.new(response) end @@ -159,11 +186,13 @@ end tpl = list.to_native_tpl begin - response = Rdkafka::Bindings.rd_kafka_assign(@native_kafka, tpl) + response = @native_kafka.with_inner do |inner| + Rdkafka::Bindings.rd_kafka_assign(inner, tpl) + end if response != 0 raise Rdkafka::RdkafkaError.new(response, "Error assigning '#{list.to_h}'") end ensure Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl) @@ -177,11 +206,13 @@ # @return [TopicPartitionList] def assignment closed_consumer_check(__method__) ptr = FFI::MemoryPointer.new(:pointer) - response = Rdkafka::Bindings.rd_kafka_assignment(@native_kafka, ptr) + response = @native_kafka.with_inner do |inner| + Rdkafka::Bindings.rd_kafka_assignment(inner, ptr) + end if response != 0 raise Rdkafka::RdkafkaError.new(response) end tpl = ptr.read_pointer @@ -195,10 +226,19 @@ end ensure ptr.free unless ptr.nil? end + # @return [Boolean] true if our current assignment has been lost involuntarily. + def assignment_lost? + closed_consumer_check(__method__) + + @native_kafka.with_inner do |inner| + !Rdkafka::Bindings.rd_kafka_assignment_lost(inner).zero? + end + end + # Return the current committed offset per partition for this consumer group. # The offset field of each requested partition will either be set to stored offset or to -1001 in case there was no stored offset for that partition. # # @param list [TopicPartitionList, nil] The topic with partitions to get the offsets for or nil to use the current subscription. # @param timeout_ms [Integer] The timeout for fetching this information. @@ -216,11 +256,13 @@ end tpl = list.to_native_tpl begin - response = Rdkafka::Bindings.rd_kafka_committed(@native_kafka, tpl, timeout_ms) + response = @native_kafka.with_inner do |inner| + Rdkafka::Bindings.rd_kafka_committed(inner, tpl, timeout_ms) + end if response != 0 raise Rdkafka::RdkafkaError.new(response) end TopicPartitionList.from_native_tpl(tpl) ensure @@ -241,18 +283,20 @@ closed_consumer_check(__method__) low = FFI::MemoryPointer.new(:int64, 1) high = FFI::MemoryPointer.new(:int64, 1) - response = Rdkafka::Bindings.rd_kafka_query_watermark_offsets( - @native_kafka, - topic, - partition, - low, - high, - timeout_ms, - ) + response = @native_kafka.with_inner do |inner| + Rdkafka::Bindings.rd_kafka_query_watermark_offsets( + inner, + topic, + partition, + low, + high, + timeout_ms, + ) + end if response != 0 raise Rdkafka::RdkafkaError.new(response, "Error querying watermark offsets for partition #{partition} of #{topic}") end return low.read_array_of_int64(1).first, high.read_array_of_int64(1).first @@ -296,21 +340,25 @@ # Returns the ClusterId as reported in broker metadata. # # @return [String, nil] def cluster_id closed_consumer_check(__method__) - Rdkafka::Bindings.rd_kafka_clusterid(@native_kafka) + @native_kafka.with_inner do |inner| + Rdkafka::Bindings.rd_kafka_clusterid(inner) + end end # Returns this client's broker-assigned group member id # # This currently requires the high-level KafkaConsumer # # @return [String, nil] def member_id closed_consumer_check(__method__) - Rdkafka::Bindings.rd_kafka_memberid(@native_kafka) + @native_kafka.with_inner do |inner| + Rdkafka::Bindings.rd_kafka_memberid(inner) + end end # Store offset of a message to be used in the next commit of this consumer # # When using this `enable.auto.offset.store` should be set to `false` in the config. @@ -323,15 +371,17 @@ def store_offset(message) closed_consumer_check(__method__) # rd_kafka_offset_store is one of the few calls that does not support # a string as the topic, so create a native topic for it. - native_topic = Rdkafka::Bindings.rd_kafka_topic_new( - @native_kafka, - message.topic, - nil - ) + native_topic = @native_kafka.with_inner do |inner| + Rdkafka::Bindings.rd_kafka_topic_new( + inner, + message.topic, + nil + ) + end response = Rdkafka::Bindings.rd_kafka_offset_store( native_topic, message.partition, message.offset ) @@ -355,15 +405,17 @@ def seek(message) closed_consumer_check(__method__) # rd_kafka_offset_store is one of the few calls that does not support # a string as the topic, so create a native topic for it. - native_topic = Rdkafka::Bindings.rd_kafka_topic_new( - @native_kafka, - message.topic, - nil - ) + native_topic = @native_kafka.with_inner do |inner| + Rdkafka::Bindings.rd_kafka_topic_new( + inner, + message.topic, + nil + ) + end response = Rdkafka::Bindings.rd_kafka_seek( native_topic, message.partition, message.offset, 0 # timeout @@ -400,11 +452,13 @@ end tpl = list ? list.to_native_tpl : nil begin - response = Rdkafka::Bindings.rd_kafka_commit(@native_kafka, tpl, async) + response = @native_kafka.with_inner do |inner| + Rdkafka::Bindings.rd_kafka_commit(inner, tpl, async) + end if response != 0 raise Rdkafka::RdkafkaError.new(response) end ensure Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl) if tpl @@ -419,11 +473,13 @@ # # @return [Message, nil] A message or nil if there was no new message within the timeout def poll(timeout_ms) closed_consumer_check(__method__) - message_ptr = Rdkafka::Bindings.rd_kafka_consumer_poll(@native_kafka, timeout_ms) + message_ptr = @native_kafka.with_inner do |inner| + Rdkafka::Bindings.rd_kafka_consumer_poll(inner, timeout_ms) + end if message_ptr.null? nil else # Create struct wrapper native_message = Rdkafka::Bindings::Message.new(message_ptr) @@ -434,11 +490,11 @@ # Create a message to pass out Rdkafka::Consumer::Message.new(native_message) end ensure # Clean up rdkafka message if there is one - if !message_ptr.nil? && !message_ptr.null? + if message_ptr && !message_ptr.null? Rdkafka::Bindings.rd_kafka_message_destroy(message_ptr) end end # Poll for new messages and yield for each received one. Iteration @@ -457,23 +513,19 @@ loop do message = poll(250) if message yield(message) else - if @closing + if closed? break else next end end end end - def closed_consumer_check(method) - raise Rdkafka::ClosedConsumerError.new(method) if @native_kafka.nil? - end - # Poll for new messages and yield them in batches that may contain # messages from more than one partition. # # Rather than yield each message immediately as soon as it is received, # each_batch will attempt to wait for as long as `timeout_ms` in order @@ -525,11 +577,11 @@ closed_consumer_check(__method__) slice = [] bytes = 0 end_time = monotonic_now + timeout_ms / 1000.0 loop do - break if @closing + break if closed? max_wait = end_time - monotonic_now max_wait_ms = if max_wait <= 0 0 # should not block, but may retrieve a message else (max_wait * 1000).floor @@ -543,11 +595,11 @@ yield slice.dup, error raise end if message slice << message - bytes += message.payload.bytesize + bytes += message.payload.bytesize if message.payload end if slice.size == max_items || bytes >= bytes_threshold || monotonic_now >= end_time - 0.001 yield slice.dup, nil slice.clear bytes = 0 @@ -558,8 +610,12 @@ private def monotonic_now # needed because Time.now can go backwards Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + + def closed_consumer_check(method) + raise Rdkafka::ClosedConsumerError.new(method) if closed? end end end