lib/karafka/connection/topic_consumer.rb in karafka-0.5.0.2 vs lib/karafka/connection/topic_consumer.rb in karafka-0.5.0.3
- old
+ new
@@ -1,10 +1,14 @@
module Karafka
module Connection
# Class used as a wrapper around Ruby-Kafka to simplify additional
# features that we provide/might provide in future
class TopicConsumer
+ # How long should we wait before trying to reconnect to Kafka cluster
+ # that went down (in seconds)
+ RECONNECT_TIMEOUT = 5
+
# Creates a queue consumer that will pull the data from Kafka
# @param [Karafka::Routing::Route] route details that will be used to build up a
# queue consumer instance
# @return [Karafka::Connection::QueueConsumer] queue consumer instance
def initialize(route)
@@ -61,9 +65,16 @@
consumer.subscribe(
@route.topic,
start_from_beginning: @route.start_from_beginning
)
end
+ rescue Kafka::ConnectionError
+ # If we would not wait it would totally spam log file with failed
+ # attempts if Kafka is down
+ sleep(RECONNECT_TIMEOUT)
+ # We don't log and just reraise - this will be logged
+ # down the road
+ raise
end
# @return [Kafka] returns a Kafka
# @note We don't cache it internally because we cache kafka_consumer that uses kafka
# object instance