Sha256: 1d82e29202fa5530b146dd18ca8f9269679b880e8abe8656aef317e592fd106c

Contents?: true

Size: 1.53 KB

Versions: 1

Compression:

Stored size: 1.53 KB

Contents

module Karafka
  module Connection
    # Class used as a wrapper around Ruby-Kafka to simplify additional
    # features that we provide/might provide in future
    class TopicConsumer
      # Creates a queue consumer that will pull the data from Kafka
      # @param [Karafka::Routing::Route] route details that will be used to build up a
      #   queue consumer instance
      # @return [Karafka::Connection::QueueConsumer] queue consumer instance
      def initialize(route)
        @route = route
      end

      # Opens connection, gets messages and calls a block for each of the incoming messages
      # @yieldparam [Kafka::FetchedMessage] kafka fetched message
      # @note This will yield with a raw message - no preprocessing or reformatting
      def fetch_loop
        kafka_consumer.each_message do |message|
          yield(message)
        end
      end

      # Gracefuly stops topic consumption
      def stop
        kafka_consumer.stop
        @kafka_consumer = nil
      end

      private

      # @return [Kafka::Consumer] returns a ready to consume Kafka consumer
      #   that is set up to consume a given routes topic
      def kafka_consumer
        return @kafka_consumer if @kafka_consumer

        kafka = Kafka.new(
          seed_brokers: ::Karafka::App.config.kafka.hosts,
          logger: ::Karafka.logger,
          client_id: ::Karafka::App.config.name
        )

        @kafka_consumer = kafka.consumer(group_id: @route.group)
        @kafka_consumer.subscribe(@route.topic)
        @kafka_consumer
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
karafka-0.5.0 lib/karafka/connection/topic_consumer.rb