lib/karafka/fetcher.rb in karafka-1.0.1 vs lib/karafka/fetcher.rb in karafka-1.1.0.alpha1

- old
+ new

@@ -7,18 +7,24 @@ class Fetcher # Starts listening on all the listeners asynchronously # Fetch loop should never end, which means that we won't create more actor clusters # so we don't have to terminate them def fetch_loop - futures = listeners.map do |listener| - listener.future.public_send(:fetch_loop, processor) + threads = listeners.map do |listener| + # We abort on exception because there should be an exception handling developed for + # each listener running in separate threads, so the exceptions should never leak + # and if that happens, it means that something really bad happened and we should stop + # the whole process + Thread + .new { listener.fetch_loop(processor) } + .tap { |thread| thread.abort_on_exception = true } end - futures.map(&:value) + threads.each(&:join) # If anything crashes here, we need to raise the error and crush the runner because it means # that something really bad happened - rescue => e + rescue StandardError => e Karafka.monitor.notice_error(self.class, e) Karafka::App.stop! raise e end @@ -33,10 +39,10 @@ # @return [Proc] proc that should be processed when a messages arrive # @yieldparam messages [Array<Kafka::FetchedMessage>] messages from kafka (raw) def processor lambda do |group_id, messages| - Karafka::Connection::MessagesProcessor.process(group_id, messages) + Karafka::Connection::Processor.process(group_id, messages) end end end end