Sha256: 57c05b109cd9a6f4296c3fc424fca6faaf30243a7f767f5f63e439191a865633

Contents?: true

Size: 1.37 KB

Versions: 4

Compression:

Stored size: 1.37 KB

Contents

module Karafka
  # Class used to run the Karafka consumer and handle shutting down, restarting etc
  # @note Creating multiple fetchers will result in having multiple connections to the same
  #   topics, which means that if there are no partitions, it won't use them.
  class Fetcher
    # Starts listening on all the listeners asynchronously
    # Fetch loop should never end, which means that we won't create more actor clusters
    # so we don't have to terminate them
    def fetch_loop
      futures = listeners.map do |listener|
        listener.future.public_send(:fetch_loop, consumer)
      end

      futures.map(&:value)
    # If anything crashes here, we need to raise the error and crush the runner because it means
    # that something really bad happened
    rescue => e
      Karafka.monitor.notice_error(self.class, e)
      Karafka::App.stop!
      raise e
    end

    private

    # @return [Array<Karafka::Connection::Listener>] listeners that will consume messages
    def listeners
      @listeners ||= App.routes.map do |route|
        Karafka::Connection::Listener.new(route)
      end
    end

    # @return [Proc] proc that should be processed when a message arrives
    # @yieldparam message [Kafka::FetchedMessage] message from kafka (raw one)
    def consumer
      lambda do |message|
        Karafka::Connection::Consumer.new.consume(message)
      end
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
karafka-0.5.0.3 lib/karafka/fetcher.rb
karafka-0.5.0.2 lib/karafka/fetcher.rb
karafka-0.5.0.1 lib/karafka/fetcher.rb
karafka-0.5.0 lib/karafka/fetcher.rb