lib/manageiq/messaging/kafka/topic.rb in manageiq-messaging-0.1.0 vs lib/manageiq/messaging/kafka/topic.rb in manageiq-messaging-0.1.1

- old
+ new

@@ -13,10 +13,12 @@ persist_ref = options[:persist_ref] if persist_ref consumer = topic_consumer(persist_ref) consumer.subscribe(topic, :start_from_beginning => false) - consumer.each_message { |message| process_topic_message(topic, message, &block) } + consumer.each_message(:automatically_mark_as_processed => auto_ack?(options)) do |message| + process_topic_message(topic, message, &block) + end else kafka_client.each_message(:topic => topic, :start_from_beginning => false) do |message| process_topic_message(topic, message, &block) end end