Sha256: 58694b0c996b1e59ca0af9ced26a9d75a974d334430bd235adfd515ec56e27c2

Contents?: true

Size: 975 Bytes

Versions: 4

Compression:

Stored size: 975 Bytes

Contents

module ManageIQ
  module Messaging
    module Kafka
      module Topic
        private

        def publish_topic_impl(options)
          raw_publish(true, *topic_for_publish(options))
        end

        def subscribe_topic_impl(options, &block)
          topic = address(options)
          persist_ref     = options[:persist_ref]
          session_timeout = options[:session_timeout]

          if persist_ref
            consumer = topic_consumer(persist_ref, session_timeout)
            consumer.subscribe(topic, :start_from_beginning => false)
            consumer.each_message(:automatically_mark_as_processed => auto_ack?(options)) do |message|
              process_topic_message(topic, message, &block)
            end
          else
            kafka_client.each_message(:topic => topic, :start_from_beginning => false) do |message|
              process_topic_message(topic, message, &block)
            end
          end
        end
      end
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
manageiq-messaging-0.1.7 lib/manageiq/messaging/kafka/topic.rb
manageiq-messaging-0.1.6 lib/manageiq/messaging/kafka/topic.rb
manageiq-messaging-0.1.5 lib/manageiq/messaging/kafka/topic.rb
manageiq-messaging-0.1.4 lib/manageiq/messaging/kafka/topic.rb