lib/karafka/fetcher.rb in karafka-1.1.2 vs lib/karafka/fetcher.rb in karafka-1.2.0.beta1

- old
+ new

@@ -3,46 +3,42 @@ module Karafka # Class used to run the Karafka consumer and handle shutting down, restarting etc # @note Creating multiple fetchers will result in having multiple connections to the same # topics, which means that if there are no partitions, it won't use them. class Fetcher - # Starts listening on all the listeners asynchronously - # Fetch loop should never end, which means that we won't create more actor clusters - # so we don't have to terminate them - def fetch_loop - threads = listeners.map do |listener| - # We abort on exception because there should be an exception handling developed for - # each listener running in separate threads, so the exceptions should never leak - # and if that happens, it means that something really bad happened and we should stop - # the whole process - Thread - .new { listener.fetch_loop(processor) } - .tap { |thread| thread.abort_on_exception = true } - end + class << self + # Starts listening on all the listeners asynchronously + # Fetch loop should never end, which means that we won't create more actor clusters + # so we don't have to terminate them + def call + threads = listeners.map do |listener| + # We abort on exception because there should be an exception handling developed for + # each listener running in separate threads, so the exceptions should never leak + # and if that happens, it means that something really bad happened and we should stop + # the whole process + Thread + .new { listener.call } + .tap { |thread| thread.abort_on_exception = true } + end - threads.each(&:join) - # If anything crashes here, we need to raise the error and crush the runner because it means - # that something really bad happened - rescue StandardError => e - Karafka.monitor.notice_error(self.class, e) - Karafka::App.stop! - raise e - end - - private - - # @return [Array<Karafka::Connection::Listener>] listeners that will consume messages - def listeners - @listeners ||= App.consumer_groups.active.map do |consumer_group| - Karafka::Connection::Listener.new(consumer_group) + # We aggregate threads here for a supervised shutdown process + threads.each { |thread| Karafka::Server.consumer_threads << thread } + threads.each(&:join) + # If anything crashes here, we need to raise the error and crush the runner because it means + # that something terrible happened + rescue StandardError => e + Karafka.monitor.instrument('fetcher.call.error', caller: self, error: e) + Karafka::App.stop! + raise e end - end - # @return [Proc] proc that should be processed when a messages arrive - # @yieldparam messages [Array<Kafka::FetchedMessage>] messages from kafka (raw) - def processor - lambda do |group_id, messages| - Karafka::Connection::Processor.process(group_id, messages) + private + + # @return [Array<Karafka::Connection::Listener>] listeners that will consume messages + def listeners + @listeners ||= App.consumer_groups.active.map do |consumer_group| + Karafka::Connection::Listener.new(consumer_group) + end end end end end