module Rdkafka # A consumer of Kafka messages. It uses the high-level consumer approach where the Kafka # brokers automatically assign partitions and load balance partitions over consumers that # have the same `:"group.id"` set in their configuration. # # To create a consumer set up a {Config} and call {Config#consumer consumer} on that. It is # mandatory to set `:"group.id"` in the configuration. class Consumer include Enumerable # @private def initialize(native_kafka) @native_kafka = native_kafka end # Close this consumer # @return [nil] def close Rdkafka::Bindings.rd_kafka_consumer_close(@native_kafka) end # Subscribe to one or more topics letting Kafka handle partition assignments. # # @param topics [Array] One or more topic names # # @raise [RdkafkaError] When subscribing fails # # @return [nil] def subscribe(*topics) # Create topic partition list with topics and no partition set tpl = Rdkafka::Bindings.rd_kafka_topic_partition_list_new(topics.length) topics.each do |topic| Rdkafka::Bindings.rd_kafka_topic_partition_list_add( tpl, topic, -1 ) end # Subscribe to topic partition list and check this was successful response = Rdkafka::Bindings.rd_kafka_subscribe(@native_kafka, tpl) if response != 0 raise Rdkafka::RdkafkaError.new(response) end ensure # Clean up the topic partition list Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl) end # Unsubscribe from all subscribed topics. # # @raise [RdkafkaError] When unsubscribing fails # # @return [nil] def unsubscribe response = Rdkafka::Bindings.rd_kafka_unsubscribe(@native_kafka) if response != 0 raise Rdkafka::RdkafkaError.new(response) end end # Return the current subscription to topics and partitions # # @raise [RdkafkaError] When getting the subscription fails. # # @return [TopicPartitionList] def subscription tpl = FFI::MemoryPointer.new(:pointer) response = Rdkafka::Bindings.rd_kafka_subscription(@native_kafka, tpl) if response != 0 raise Rdkafka::RdkafkaError.new(response) end Rdkafka::Consumer::TopicPartitionList.new(tpl.get_pointer(0)) end # Return the current committed offset per partition for this consumer group. # The offset field of each requested partition will either be set to stored offset or to -1001 in case there was no stored offset for that partition. # # @param list [TopicPartitionList] The topic with partitions to get the offsets for. # @param timeout_ms [Integer] The timeout for fetching this information. # # @raise [RdkafkaError] When getting the committed positions fails. # # @return [TopicPartitionList] def committed(list, timeout_ms=200) unless list.is_a?(TopicPartitionList) raise TypeError.new("list has to be a TopicPartitionList") end tpl = list.copy_tpl response = Rdkafka::Bindings.rd_kafka_committed(@native_kafka, tpl, timeout_ms) if response != 0 raise Rdkafka::RdkafkaError.new(response) end Rdkafka::Consumer::TopicPartitionList.new(tpl) end # Query broker for low (oldest/beginning) and high (newest/end) offsets for a partition. # # @param topic [String] The topic to query # @param partition [Integer] The partition to query # @param timeout_ms [Integer] The timeout for querying the broker # # @raise [RdkafkaError] When querying the broker fails. # # @return [Integer] The low and high watermark def query_watermark_offsets(topic, partition, timeout_ms=200) low = FFI::MemoryPointer.new(:int64, 1) high = FFI::MemoryPointer.new(:int64, 1) response = Rdkafka::Bindings.rd_kafka_query_watermark_offsets( @native_kafka, topic, partition, low, high, timeout_ms ) if response != 0 raise Rdkafka::RdkafkaError.new(response) end return low.read_int, high.read_int end # Commit the current offsets of this consumer # # @param async [Boolean] Whether to commit async or wait for the commit to finish # # @raise [RdkafkaError] When comitting fails # # @return [nil] def commit(async=false) response = Rdkafka::Bindings.rd_kafka_commit(@native_kafka, nil, async) if response != 0 raise Rdkafka::RdkafkaError.new(response) end end # Poll for the next message on one of the subscribed topics # # @param timeout_ms [Integer] Timeout of this poll # # @raise [RdkafkaError] When polling fails # # @return [Message, nil] A message or nil if there was no new message within the timeout def poll(timeout_ms) message_ptr = Rdkafka::Bindings.rd_kafka_consumer_poll(@native_kafka, timeout_ms) if message_ptr.null? nil else # Create struct wrapper native_message = Rdkafka::Bindings::Message.new(message_ptr) # Raise error if needed if native_message[:err] != 0 raise Rdkafka::RdkafkaError.new(native_message[:err]) end # Create a message to pass out Rdkafka::Consumer::Message.new(native_message) end ensure # Clean up rdkafka message if there is one unless message_ptr.null? Rdkafka::Bindings.rd_kafka_message_destroy(message_ptr) end end # Poll for new messages and yield for each received one # # @raise [RdkafkaError] When polling fails # # @yieldparam message [Message] Received message # # @return [nil] def each(&block) loop do message = poll(250) if message block.call(message) else next end end end end end