Sha256: 2bbc13a286522d474ef34902c5feb7595e2f148dc43a98f8c717849076c9a27a
Contents?: true
Size: 741 Bytes
Versions: 3
Compression:
Stored size: 741 Bytes
Contents
require 'socket' module ManageIQ module Messaging module Kafka module Topic GROUP_FOR_ADHOC_LISTENERS = Socket.gethostname.freeze private def publish_topic_impl(options) raw_publish(true, *topic_for_publish(options)) end def subscribe_topic_impl(options, &block) topic = address(options) options[:persist_ref] = "#{GROUP_FOR_ADHOC_LISTENERS}_#{Time.now.to_i}" unless options[:persist_ref] topic_consumer = consumer(false, options) topic_consumer.subscribe(topic) topic_consumer.each do |message| process_topic_message(topic_consumer, topic, message, &block) end end end end end end
Version data entries
3 entries across 3 versions & 1 rubygems