lib/karafka/runner.rb in karafka-2.2.14 vs lib/karafka/runner.rb in karafka-2.3.0.alpha1

- old
+ new

@@ -1,29 +1,39 @@ # frozen_string_literal: true module Karafka # Class used to run the Karafka listeners in separate threads class Runner + def initialize + @manager = App.config.internal.connection.manager + @conductor = App.config.internal.connection.conductor + end + # Starts listening on all the listeners asynchronously and handles the jobs queue closing # after listeners are done with their work. def call # Despite possibility of having several independent listeners, we aim to have one queue for # jobs across and one workers poll for that jobs_queue = App.config.internal.processing.jobs_queue_class.new workers = Processing::WorkersBatch.new(jobs_queue) listeners = Connection::ListenersBatch.new(jobs_queue) + # Register all the listeners so they can be started and managed + @manager.register(listeners) + workers.each(&:async_call) - listeners.each(&:async_call) # We aggregate threads here for a supervised shutdown process Karafka::Server.workers = workers Karafka::Server.listeners = listeners Karafka::Server.jobs_queue = jobs_queue - # All the listener threads need to finish - listeners.each(&:join) + until @manager.done? + @conductor.wait + + @manager.control + end # We close the jobs queue only when no listener threads are working. # This ensures, that everything was closed prior to us not accepting anymore jobs and that # no more jobs will be enqueued. Since each listener waits for jobs to finish, once those # are done, we can close.