lib/karafka/fetcher.rb in karafka-1.0.1 vs lib/karafka/fetcher.rb in karafka-1.1.0.alpha1
- old
+ new
@@ -7,18 +7,24 @@
class Fetcher
# Starts listening on all the listeners asynchronously
# Fetch loop should never end, which means that we won't create more actor clusters
# so we don't have to terminate them
def fetch_loop
- futures = listeners.map do |listener|
- listener.future.public_send(:fetch_loop, processor)
+ threads = listeners.map do |listener|
+ # We abort on exception because there should be an exception handling developed for
+ # each listener running in separate threads, so the exceptions should never leak
+ # and if that happens, it means that something really bad happened and we should stop
+ # the whole process
+ Thread
+ .new { listener.fetch_loop(processor) }
+ .tap { |thread| thread.abort_on_exception = true }
end
- futures.map(&:value)
+ threads.each(&:join)
# If anything crashes here, we need to raise the error and crush the runner because it means
# that something really bad happened
- rescue => e
+ rescue StandardError => e
Karafka.monitor.notice_error(self.class, e)
Karafka::App.stop!
raise e
end
@@ -33,10 +39,10 @@
# @return [Proc] proc that should be processed when a messages arrive
# @yieldparam messages [Array<Kafka::FetchedMessage>] messages from kafka (raw)
def processor
lambda do |group_id, messages|
- Karafka::Connection::MessagesProcessor.process(group_id, messages)
+ Karafka::Connection::Processor.process(group_id, messages)
end
end
end
end