Sha256: e2d37f1594f3660dd985f80ae50772f61be94fe6fb8ce70cdaf06a27e57f9847
Contents?: true
Size: 1.53 KB
Versions: 1
Compression:
Stored size: 1.53 KB
Contents
# frozen_string_literal: true module Kril # Consumers records from Kafka class Consumer def initialize(avro: nil, kafka: nil, config: {}) config[:group_id] ||= '🦐' @avro = avro @kafka = kafka @config = config end def consume_one(topic) consumer = build_consumer(topic, true, @config) msg = nil consumer.each_message do |message| msg = decode(message) consumer.mark_message_as_processed(message) consumer.commit_offsets consumer.stop end msg ensure consumer.stop end def consume_all(topic) config = @config.clone config[:group_id] = SecureRandom.uuid consumer = build_consumer(topic, true, config) consumer.each_message do |message| yield decode(message), consumer end ensure consumer.stop end def listen(topic) consumer = build_consumer(topic, false, @config) consumer.each_message do |message| yield decode(message), consumer end ensure consumer.stop end private def build_consumer(topic, start_from_beginning, config) consumer = @kafka.consumer(config) consumer.subscribe(topic, start_from_beginning: start_from_beginning) consumer end def decode(message) { key: message.key, value: @avro.decode(message.value), offset: message.offset, create_time: message.create_time, topic: message.topic, partition: message.partition } end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
kril-0.1.0 | lib/kril/consumer.rb |