lib/manageiq/messaging/kafka/topic.rb in manageiq-messaging-0.1.0 vs lib/manageiq/messaging/kafka/topic.rb in manageiq-messaging-0.1.1
- old
+ new
@@ -13,10 +13,12 @@
persist_ref = options[:persist_ref]
if persist_ref
consumer = topic_consumer(persist_ref)
consumer.subscribe(topic, :start_from_beginning => false)
- consumer.each_message { |message| process_topic_message(topic, message, &block) }
+ consumer.each_message(:automatically_mark_as_processed => auto_ack?(options)) do |message|
+ process_topic_message(topic, message, &block)
+ end
else
kafka_client.each_message(:topic => topic, :start_from_beginning => false) do |message|
process_topic_message(topic, message, &block)
end
end