lib/karafka/connection/topic_consumer.rb in karafka-0.5.0.1 vs lib/karafka/connection/topic_consumer.rb in karafka-0.5.0.2
- old
+ new
@@ -13,11 +13,13 @@
# Opens connection, gets messages and calls a block for each of the incoming messages
# @yieldparam [Kafka::FetchedMessage] kafka fetched message
# @note This will yield with a raw message - no preprocessing or reformatting
def fetch_loop
- kafka_consumer.each_message do |message|
+ send(
+ @route.batch_mode ? :consume_each_batch : :consume_each_message
+ ) do |message|
yield(message)
end
end
# Gracefuly stops topic consumption
@@ -26,30 +28,56 @@
@kafka_consumer = nil
end
private
+ # Consumes messages from Kafka in batches
+ # @yieldparam [Kafka::FetchedMessage] kafka fetched message
+ def consume_each_batch
+ kafka_consumer.each_batch do |batch|
+ batch.messages.each do |message|
+ yield(message)
+ end
+ end
+ end
+
+ # Consumes messages from Kafka one by one
+ # @yieldparam [Kafka::FetchedMessage] kafka fetched message
+ def consume_each_message
+ kafka_consumer.each_message do |message|
+ yield(message)
+ end
+ end
+
# @return [Kafka::Consumer] returns a ready to consume Kafka consumer
# that is set up to consume a given routes topic
def kafka_consumer
- return @kafka_consumer if @kafka_consumer
-
- kafka = Kafka.new(
- seed_brokers: ::Karafka::App.config.kafka.hosts,
- logger: ::Karafka.logger,
- client_id: ::Karafka::App.config.name
- )
-
- @kafka_consumer = kafka.consumer(
+ @kafka_consumer ||= kafka.consumer(
group_id: @route.group,
session_timeout: ::Karafka::App.config.kafka.session_timeout,
offset_commit_interval: ::Karafka::App.config.kafka.offset_commit_interval,
offset_commit_threshold: ::Karafka::App.config.kafka.offset_commit_threshold,
heartbeat_interval: ::Karafka::App.config.kafka.heartbeat_interval
- )
+ ).tap do |consumer|
+ consumer.subscribe(
+ @route.topic,
+ start_from_beginning: @route.start_from_beginning
+ )
+ end
+ end
- @kafka_consumer.subscribe(@route.topic)
- @kafka_consumer
+ # @return [Kafka] returns a Kafka
+ # @note We don't cache it internally because we cache kafka_consumer that uses kafka
+ # object instance
+ def kafka
+ Kafka.new(
+ seed_brokers: ::Karafka::App.config.kafka.hosts,
+ logger: ::Karafka.logger,
+ client_id: ::Karafka::App.config.name,
+ ssl_ca_cert: ::Karafka::App.config.kafka.ssl.ca_cert,
+ ssl_client_cert: ::Karafka::App.config.kafka.ssl.client_cert,
+ ssl_client_cert_key: ::Karafka::App.config.kafka.ssl.client_cert_key
+ )
end
end
end
end