Sha256: 2bbc13a286522d474ef34902c5feb7595e2f148dc43a98f8c717849076c9a27a

Contents?: true

Size: 741 Bytes

Versions: 3

Compression:

Stored size: 741 Bytes

Contents

require 'socket'

module ManageIQ
  module Messaging
    module Kafka
      module Topic
        GROUP_FOR_ADHOC_LISTENERS = Socket.gethostname.freeze

        private

        def publish_topic_impl(options)
          raw_publish(true, *topic_for_publish(options))
        end

        def subscribe_topic_impl(options, &block)
          topic = address(options)

          options[:persist_ref] = "#{GROUP_FOR_ADHOC_LISTENERS}_#{Time.now.to_i}" unless options[:persist_ref]
          topic_consumer = consumer(false, options)
          topic_consumer.subscribe(topic)
          topic_consumer.each do |message|
            process_topic_message(topic_consumer, topic, message, &block)
          end
        end
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
manageiq-messaging-1.0.2 lib/manageiq/messaging/kafka/topic.rb
manageiq-messaging-1.0.1 lib/manageiq/messaging/kafka/topic.rb
manageiq-messaging-1.0.0 lib/manageiq/messaging/kafka/topic.rb