Sha256: 59d342205599f30058ab7ccf9ae2ecd3033dd7ba96c3a3532f64e6df6932013f

Contents?: true

Size: 900 Bytes

Versions: 3

Compression:

Stored size: 900 Bytes

Contents

module ManageIQ
  module Messaging
    module Kafka
      module Topic
        private

        def publish_topic_impl(options)
          raw_publish(true, *topic_for_publish(options))
        end

        def subscribe_topic_impl(options, &block)
          topic = address(options)
          persist_ref = options[:persist_ref]

          if persist_ref
            consumer = topic_consumer(persist_ref)
            consumer.subscribe(topic, :start_from_beginning => false)
            consumer.each_message(:automatically_mark_as_processed => auto_ack?(options)) do |message|
              process_topic_message(topic, message, &block)
            end
          else
            kafka_client.each_message(:topic => topic, :start_from_beginning => false) do |message|
              process_topic_message(topic, message, &block)
            end
          end
        end
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
manageiq-messaging-0.1.3 lib/manageiq/messaging/kafka/topic.rb
manageiq-messaging-0.1.2 lib/manageiq/messaging/kafka/topic.rb
manageiq-messaging-0.1.1 lib/manageiq/messaging/kafka/topic.rb