lib/kril/consumer.rb in kril-0.1.0 vs lib/kril/consumer.rb in kril-0.1.1

- old
+ new

@@ -1,17 +1,25 @@ # frozen_string_literal: true module Kril - # Consumers records from Kafka + # High level abstraction for consuming records from topics. class Consumer + # avro - Avro instance for deserializing records [AvroTurf::Messaging] + # kafka - Kafka instance for creating consumers [Kafka] + # config - consumer configuration (optional) [Hash] def initialize(avro: nil, kafka: nil, config: {}) - config[:group_id] ||= '🦐' + config[:group_id] ||= 'kril-consumer' @avro = avro @kafka = kafka @config = config end + # Consume a single record from any partition. + # Will block indefinitely if no record present. + # + # topic - topic to consume from [String] + # return - deserialized record [String] def consume_one(topic) consumer = build_consumer(topic, true, @config) msg = nil consumer.each_message do |message| msg = decode(message) @@ -22,22 +30,20 @@ msg ensure consumer.stop end + # Consume all records from a topic. Each record will be yielded + # to block along with consumer instance. Will listen to topic + # after all records have been consumed. + # + # topic - topic to consume from [String] + # yields - record, consumer [String, Kafka::Consumer] + # return - [nil] def consume_all(topic) config = @config.clone config[:group_id] = SecureRandom.uuid consumer = build_consumer(topic, true, config) - consumer.each_message do |message| - yield decode(message), consumer - end - ensure - consumer.stop - end - - def listen(topic) - consumer = build_consumer(topic, false, @config) consumer.each_message do |message| yield decode(message), consumer end ensure consumer.stop