Sha256: 4a86322411de2ad6e37243bf2e65fed9727bfb2c262c94f74541e64829bc247f
Contents?: true
Size: 1.48 KB
Versions: 1
Compression:
Stored size: 1.48 KB
Contents
module Rdkafka class Consumer include Enumerable def initialize(native_kafka) @native_kafka = native_kafka end def subscribe(*topics) # Create topic partition list with topics and no partition set tpl = Rdkafka::FFI.rd_kafka_topic_partition_list_new(topics.length) topics.each do |topic| Rdkafka::FFI.rd_kafka_topic_partition_list_add( tpl, topic, -1 ) end # Subscribe to topic partition list and check this was successful response = Rdkafka::FFI.rd_kafka_subscribe(@native_kafka, tpl) if response != 0 raise Rdkafka::RdkafkaError.new(response) end ensure # Clean up the topic partition list Rdkafka::FFI.rd_kafka_topic_partition_list_destroy(tpl) end def commit(async=false) response = Rdkafka::FFI.rd_kafka_commit(@native_kafka, nil, async) if response != 0 raise Rdkafka::RdkafkaError.new(response) end end def poll(timeout_ms) message_ptr = Rdkafka::FFI.rd_kafka_consumer_poll(@native_kafka, timeout_ms) if message_ptr.null? nil else message = Rdkafka::FFI::Message.new(message_ptr) if message.err != 0 raise Rdkafka::RdkafkaError.new(message.err) end message end end def each(&block) loop do message = poll(1000) if message block.call(message) else next end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
rdkafka-0.1.9 | lib/rdkafka/consumer.rb |