lib/karafka/connection/topic_consumer.rb in karafka-0.5.0.1 vs lib/karafka/connection/topic_consumer.rb in karafka-0.5.0.2

- old
+ new

@@ -13,11 +13,13 @@ # Opens connection, gets messages and calls a block for each of the incoming messages # @yieldparam [Kafka::FetchedMessage] kafka fetched message # @note This will yield with a raw message - no preprocessing or reformatting def fetch_loop - kafka_consumer.each_message do |message| + send( + @route.batch_mode ? :consume_each_batch : :consume_each_message + ) do |message| yield(message) end end # Gracefuly stops topic consumption @@ -26,30 +28,56 @@ @kafka_consumer = nil end private + # Consumes messages from Kafka in batches + # @yieldparam [Kafka::FetchedMessage] kafka fetched message + def consume_each_batch + kafka_consumer.each_batch do |batch| + batch.messages.each do |message| + yield(message) + end + end + end + + # Consumes messages from Kafka one by one + # @yieldparam [Kafka::FetchedMessage] kafka fetched message + def consume_each_message + kafka_consumer.each_message do |message| + yield(message) + end + end + # @return [Kafka::Consumer] returns a ready to consume Kafka consumer # that is set up to consume a given routes topic def kafka_consumer - return @kafka_consumer if @kafka_consumer - - kafka = Kafka.new( - seed_brokers: ::Karafka::App.config.kafka.hosts, - logger: ::Karafka.logger, - client_id: ::Karafka::App.config.name - ) - - @kafka_consumer = kafka.consumer( + @kafka_consumer ||= kafka.consumer( group_id: @route.group, session_timeout: ::Karafka::App.config.kafka.session_timeout, offset_commit_interval: ::Karafka::App.config.kafka.offset_commit_interval, offset_commit_threshold: ::Karafka::App.config.kafka.offset_commit_threshold, heartbeat_interval: ::Karafka::App.config.kafka.heartbeat_interval - ) + ).tap do |consumer| + consumer.subscribe( + @route.topic, + start_from_beginning: @route.start_from_beginning + ) + end + end - @kafka_consumer.subscribe(@route.topic) - @kafka_consumer + # @return [Kafka] returns a Kafka + # @note We don't cache it internally because we cache kafka_consumer that uses kafka + # object instance + def kafka + Kafka.new( + seed_brokers: ::Karafka::App.config.kafka.hosts, + logger: ::Karafka.logger, + client_id: ::Karafka::App.config.name, + ssl_ca_cert: ::Karafka::App.config.kafka.ssl.ca_cert, + ssl_client_cert: ::Karafka::App.config.kafka.ssl.client_cert, + ssl_client_cert_key: ::Karafka::App.config.kafka.ssl.client_cert_key + ) end end end end