lib/karafka/connection/topic_consumer.rb in karafka-0.5.0.2 vs lib/karafka/connection/topic_consumer.rb in karafka-0.5.0.3

- old
+ new

@@ -1,10 +1,14 @@ module Karafka module Connection # Class used as a wrapper around Ruby-Kafka to simplify additional # features that we provide/might provide in future class TopicConsumer + # How long should we wait before trying to reconnect to Kafka cluster + # that went down (in seconds) + RECONNECT_TIMEOUT = 5 + # Creates a queue consumer that will pull the data from Kafka # @param [Karafka::Routing::Route] route details that will be used to build up a # queue consumer instance # @return [Karafka::Connection::QueueConsumer] queue consumer instance def initialize(route) @@ -61,9 +65,16 @@ consumer.subscribe( @route.topic, start_from_beginning: @route.start_from_beginning ) end + rescue Kafka::ConnectionError + # If we would not wait it would totally spam log file with failed + # attempts if Kafka is down + sleep(RECONNECT_TIMEOUT) + # We don't log and just reraise - this will be logged + # down the road + raise end # @return [Kafka] returns a Kafka # @note We don't cache it internally because we cache kafka_consumer that uses kafka # object instance