Sha256: 7068acebd08479bedc40aa78dcf257f72940dc2e5e9d31758891d5d525724d0e
Contents?: true
Size: 1.66 KB
Versions: 9
Compression:
Stored size: 1.66 KB
Contents
module Rdkafka class Consumer include Enumerable def initialize(native_kafka) @native_kafka = native_kafka end def subscribe(*topics) # Create topic partition list with topics and no partition set tpl = Rdkafka::FFI.rd_kafka_topic_partition_list_new(topics.length) topics.each do |topic| Rdkafka::FFI.rd_kafka_topic_partition_list_add( tpl, topic, -1 ) end # Subscribe to topic partition list and check this was successful response = Rdkafka::FFI.rd_kafka_subscribe(@native_kafka, tpl) if response != 0 raise Rdkafka::RdkafkaError.new(response) end ensure # Clean up the topic partition list Rdkafka::FFI.rd_kafka_topic_partition_list_destroy(tpl) end def commit(async=false) response = Rdkafka::FFI.rd_kafka_commit(@native_kafka, nil, async) if response != 0 raise Rdkafka::RdkafkaError.new(response) end end def poll(timeout_ms=100) message_ptr = Rdkafka::FFI.rd_kafka_consumer_poll(@native_kafka, timeout_ms) if message_ptr.null? nil else message = Rdkafka::FFI::Message.new(message_ptr) if message.err != 0 raise Rdkafka::RdkafkaError.new(message.err) end message end end def each(&block) loop do message = poll(10) if message block.call(message) else # Sleep here instead of using a longer poll timeout so interrupting the # program works properly, MRI has a hard time interrupting FFI calls. sleep 0.1 next end end end end end
Version data entries
9 entries across 9 versions & 1 rubygems