lib/karafka/runner.rb in karafka-2.0.0.beta1 vs lib/karafka/runner.rb in karafka-2.0.0.beta2
- old
+ new
@@ -1,36 +1,41 @@
# frozen_string_literal: true
module Karafka
# Class used to run the Karafka listeners in separate threads
class Runner
- # Starts listening on all the listeners asynchronously
- # Fetch loop should never end. If they do, it is a critical error
+ # Starts listening on all the listeners asynchronously and handles the jobs queue closing
+ # after listeners are done with their work.
def call
# Despite possibility of having several independent listeners, we aim to have one queue for
# jobs across and one workers poll for that
jobs_queue = Processing::JobsQueue.new
workers = Processing::WorkersBatch.new(jobs_queue)
- Karafka::Server.workers = workers
+ listeners = Connection::ListenersBatch.new(jobs_queue)
- threads = listeners(jobs_queue).map do |listener|
- # We abort on exception because there should be an exception handling developed for
- # each listener running in separate threads, so the exceptions should never leak
- # and if that happens, it means that something really bad happened and we should stop
- # the whole process
- Thread
- .new { listener.call }
- .tap { |thread| thread.abort_on_exception = true }
- end
+ workers.each(&:async_call)
+ listeners.each(&:async_call)
# We aggregate threads here for a supervised shutdown process
- Karafka::Server.consumer_threads = threads
+ Karafka::Server.workers = workers
+ Karafka::Server.listeners = listeners
# All the listener threads need to finish
- threads.each(&:join)
+ listeners.each(&:join)
+
+ # We close the jobs queue only when no listener threads are working.
+ # This ensures, that everything was closed prior to us not accepting anymore jobs and that
+ # no more jobs will be enqueued. Since each listener waits for jobs to finish, once those
+ # are done, we can close.
+ jobs_queue.close
+
# All the workers need to stop processing anything before we can stop the runner completely
+ # This ensures that even async long-running jobs have time to finish before we are done
+ # with everything. One thing worth keeping in mind though: It is the end user responsibility
+ # to handle the shutdown detection in their long-running processes. Otherwise if timeout
+ # is exceeded, there will be a forced shutdown.
workers.each(&:join)
# If anything crashes here, we need to raise the error and crush the runner because it means
# that something terrible happened
rescue StandardError => e
Karafka.monitor.instrument(
@@ -39,21 +44,8 @@
error: e,
type: 'runner.call.error'
)
Karafka::App.stop!
raise e
- end
-
- private
-
- # @param jobs_queue [Processing::JobsQueue] the main processing queue
- # @return [Array<Karafka::Connection::Listener>] listeners that will consume messages for each
- # of the subscription groups
- def listeners(jobs_queue)
- App
- .subscription_groups
- .map do |subscription_group|
- Karafka::Connection::Listener.new(subscription_group, jobs_queue)
- end
end
end
end