lib/karafka/runner.rb in karafka-2.2.14 vs lib/karafka/runner.rb in karafka-2.3.0.alpha1
- old
+ new
@@ -1,29 +1,39 @@
# frozen_string_literal: true
module Karafka
# Class used to run the Karafka listeners in separate threads
class Runner
+ def initialize
+ @manager = App.config.internal.connection.manager
+ @conductor = App.config.internal.connection.conductor
+ end
+
# Starts listening on all the listeners asynchronously and handles the jobs queue closing
# after listeners are done with their work.
def call
# Despite possibility of having several independent listeners, we aim to have one queue for
# jobs across and one workers poll for that
jobs_queue = App.config.internal.processing.jobs_queue_class.new
workers = Processing::WorkersBatch.new(jobs_queue)
listeners = Connection::ListenersBatch.new(jobs_queue)
+ # Register all the listeners so they can be started and managed
+ @manager.register(listeners)
+
workers.each(&:async_call)
- listeners.each(&:async_call)
# We aggregate threads here for a supervised shutdown process
Karafka::Server.workers = workers
Karafka::Server.listeners = listeners
Karafka::Server.jobs_queue = jobs_queue
- # All the listener threads need to finish
- listeners.each(&:join)
+ until @manager.done?
+ @conductor.wait
+
+ @manager.control
+ end
# We close the jobs queue only when no listener threads are working.
# This ensures, that everything was closed prior to us not accepting anymore jobs and that
# no more jobs will be enqueued. Since each listener waits for jobs to finish, once those
# are done, we can close.