lib/karafka/runner.rb in karafka-2.0.0.beta1 vs lib/karafka/runner.rb in karafka-2.0.0.beta2

- old
+ new

@@ -1,36 +1,41 @@ # frozen_string_literal: true module Karafka # Class used to run the Karafka listeners in separate threads class Runner - # Starts listening on all the listeners asynchronously - # Fetch loop should never end. If they do, it is a critical error + # Starts listening on all the listeners asynchronously and handles the jobs queue closing + # after listeners are done with their work. def call # Despite possibility of having several independent listeners, we aim to have one queue for # jobs across and one workers poll for that jobs_queue = Processing::JobsQueue.new workers = Processing::WorkersBatch.new(jobs_queue) - Karafka::Server.workers = workers + listeners = Connection::ListenersBatch.new(jobs_queue) - threads = listeners(jobs_queue).map do |listener| - # We abort on exception because there should be an exception handling developed for - # each listener running in separate threads, so the exceptions should never leak - # and if that happens, it means that something really bad happened and we should stop - # the whole process - Thread - .new { listener.call } - .tap { |thread| thread.abort_on_exception = true } - end + workers.each(&:async_call) + listeners.each(&:async_call) # We aggregate threads here for a supervised shutdown process - Karafka::Server.consumer_threads = threads + Karafka::Server.workers = workers + Karafka::Server.listeners = listeners # All the listener threads need to finish - threads.each(&:join) + listeners.each(&:join) + + # We close the jobs queue only when no listener threads are working. + # This ensures, that everything was closed prior to us not accepting anymore jobs and that + # no more jobs will be enqueued. Since each listener waits for jobs to finish, once those + # are done, we can close. + jobs_queue.close + # All the workers need to stop processing anything before we can stop the runner completely + # This ensures that even async long-running jobs have time to finish before we are done + # with everything. One thing worth keeping in mind though: It is the end user responsibility + # to handle the shutdown detection in their long-running processes. Otherwise if timeout + # is exceeded, there will be a forced shutdown. workers.each(&:join) # If anything crashes here, we need to raise the error and crush the runner because it means # that something terrible happened rescue StandardError => e Karafka.monitor.instrument( @@ -39,21 +44,8 @@ error: e, type: 'runner.call.error' ) Karafka::App.stop! raise e - end - - private - - # @param jobs_queue [Processing::JobsQueue] the main processing queue - # @return [Array<Karafka::Connection::Listener>] listeners that will consume messages for each - # of the subscription groups - def listeners(jobs_queue) - App - .subscription_groups - .map do |subscription_group| - Karafka::Connection::Listener.new(subscription_group, jobs_queue) - end end end end