lib/kril/consumer.rb in kril-0.1.0 vs lib/kril/consumer.rb in kril-0.1.1
- old
+ new
@@ -1,17 +1,25 @@
# frozen_string_literal: true
module Kril
- # Consumers records from Kafka
+ # High level abstraction for consuming records from topics.
class Consumer
+ # avro - Avro instance for deserializing records [AvroTurf::Messaging]
+ # kafka - Kafka instance for creating consumers [Kafka]
+ # config - consumer configuration (optional) [Hash]
def initialize(avro: nil, kafka: nil, config: {})
- config[:group_id] ||= '🦐'
+ config[:group_id] ||= 'kril-consumer'
@avro = avro
@kafka = kafka
@config = config
end
+ # Consume a single record from any partition.
+ # Will block indefinitely if no record present.
+ #
+ # topic - topic to consume from [String]
+ # return - deserialized record [String]
def consume_one(topic)
consumer = build_consumer(topic, true, @config)
msg = nil
consumer.each_message do |message|
msg = decode(message)
@@ -22,22 +30,20 @@
msg
ensure
consumer.stop
end
+ # Consume all records from a topic. Each record will be yielded
+ # to block along with consumer instance. Will listen to topic
+ # after all records have been consumed.
+ #
+ # topic - topic to consume from [String]
+ # yields - record, consumer [String, Kafka::Consumer]
+ # return - [nil]
def consume_all(topic)
config = @config.clone
config[:group_id] = SecureRandom.uuid
consumer = build_consumer(topic, true, config)
- consumer.each_message do |message|
- yield decode(message), consumer
- end
- ensure
- consumer.stop
- end
-
- def listen(topic)
- consumer = build_consumer(topic, false, @config)
consumer.each_message do |message|
yield decode(message), consumer
end
ensure
consumer.stop