lib/manageiq/messaging/kafka/topic.rb in manageiq-messaging-0.1.3 vs lib/manageiq/messaging/kafka/topic.rb in manageiq-messaging-0.1.4
- old
+ new
@@ -8,13 +8,14 @@
raw_publish(true, *topic_for_publish(options))
end
def subscribe_topic_impl(options, &block)
topic = address(options)
- persist_ref = options[:persist_ref]
+ persist_ref = options[:persist_ref]
+ session_timeout = options[:session_timeout]
if persist_ref
- consumer = topic_consumer(persist_ref)
+ consumer = topic_consumer(persist_ref, session_timeout)
consumer.subscribe(topic, :start_from_beginning => false)
consumer.each_message(:automatically_mark_as_processed => auto_ack?(options)) do |message|
process_topic_message(topic, message, &block)
end
else