Sha256: e2d37f1594f3660dd985f80ae50772f61be94fe6fb8ce70cdaf06a27e57f9847

Contents?: true

Size: 1.53 KB

Versions: 1

Compression:

Stored size: 1.53 KB

Contents

# frozen_string_literal: true

module Kril
  # Consumers records from Kafka
  class Consumer
    def initialize(avro: nil, kafka: nil, config: {})
      config[:group_id] ||= '🦐'
      @avro = avro
      @kafka = kafka
      @config = config
    end

    def consume_one(topic)
      consumer = build_consumer(topic, true, @config)
      msg = nil
      consumer.each_message do |message|
        msg = decode(message)
        consumer.mark_message_as_processed(message)
        consumer.commit_offsets
        consumer.stop
      end
      msg
    ensure
      consumer.stop
    end

    def consume_all(topic)
      config = @config.clone
      config[:group_id] = SecureRandom.uuid
      consumer = build_consumer(topic, true, config)
      consumer.each_message do |message|
        yield decode(message), consumer
      end
    ensure
      consumer.stop
    end

    def listen(topic)
      consumer = build_consumer(topic, false, @config)
      consumer.each_message do |message|
        yield decode(message), consumer
      end
    ensure
      consumer.stop
    end

    private

    def build_consumer(topic, start_from_beginning, config)
      consumer = @kafka.consumer(config)
      consumer.subscribe(topic, start_from_beginning: start_from_beginning)
      consumer
    end

    def decode(message)
      {
        key: message.key,
        value: @avro.decode(message.value),
        offset: message.offset,
        create_time: message.create_time,
        topic: message.topic,
        partition: message.partition
      }
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
kril-0.1.0 lib/kril/consumer.rb