Sha256: cc477ee4e849cf8dd567d2866729f9b3b403e20f79b4e5a421c278e14ad95c10

Contents?: true

Size: 1.69 KB

Versions: 27

Compression:

Stored size: 1.69 KB

Contents

# frozen_string_literal: true

module Karafka
  # Class used to run the Karafka consumer and handle shutting down, restarting etc
  # @note Creating multiple fetchers will result in having multiple connections to the same
  #   topics, which means that if there are no partitions, it won't use them.
  class Fetcher
    # Starts listening on all the listeners asynchronously
    # Fetch loop should never end, which means that we won't create more actor clusters
    # so we don't have to terminate them
    def call
      threads = listeners.map do |listener|
        # We abort on exception because there should be an exception handling developed for
        # each listener running in separate threads, so the exceptions should never leak
        # and if that happens, it means that something really bad happened and we should stop
        # the whole process
        Thread
          .new { listener.call }
          .tap { |thread| thread.abort_on_exception = true }
      end

      # We aggregate threads here for a supervised shutdown process
      threads.each { |thread| Karafka::Server.consumer_threads << thread }
      threads.each(&:join)
    # If anything crashes here, we need to raise the error and crush the runner because it means
    # that something terrible happened
    rescue StandardError => e
      Karafka.monitor.instrument('fetcher.call.error', caller: self, error: e)
      Karafka::App.stop!
      raise e
    end

    private

    # @return [Array<Karafka::Connection::Listener>] listeners that will consume messages
    def listeners
      @listeners ||= App.consumer_groups.active.map do |consumer_group|
        Karafka::Connection::Listener.new(consumer_group)
      end
    end
  end
end

Version data entries

27 entries across 27 versions & 1 rubygems

Version Path
karafka-1.4.15 lib/karafka/fetcher.rb
karafka-1.4.14 lib/karafka/fetcher.rb
karafka-1.4.13 lib/karafka/fetcher.rb
karafka-1.4.12 lib/karafka/fetcher.rb
karafka-1.4.11 lib/karafka/fetcher.rb
karafka-1.4.10 lib/karafka/fetcher.rb
karafka-1.4.9 lib/karafka/fetcher.rb
karafka-1.4.8 lib/karafka/fetcher.rb
karafka-1.4.7 lib/karafka/fetcher.rb
karafka-1.4.6 lib/karafka/fetcher.rb
karafka-1.4.5 lib/karafka/fetcher.rb
karafka-1.4.4 lib/karafka/fetcher.rb
karafka-1.4.3 lib/karafka/fetcher.rb
karafka-1.4.2 lib/karafka/fetcher.rb
karafka-1.4.1 lib/karafka/fetcher.rb
karafka-1.4.0 lib/karafka/fetcher.rb
karafka-1.4.0.rc2 lib/karafka/fetcher.rb
karafka-1.4.0.rc1 lib/karafka/fetcher.rb
karafka-1.3.7 lib/karafka/fetcher.rb
karafka-1.3.6 lib/karafka/fetcher.rb