Sha256: dd280218af34f1c33ff1c534a537a660b31642ee7bffd5c384bed2eb0cf80c4f

Contents?: true

Size: 1.15 KB

Versions: 7

Compression:

Stored size: 1.15 KB

Contents

module ManageIQ
  module Messaging
    module Kafka
      module Queue
        GROUP_FOR_QUEUE_MESSAGES = ENV['QUEUE_MESSAGES_GROUP_PREFIX'].freeze || 'manageiq_messaging_queue_group_'.freeze

        private

        def publish_message_impl(options)
          raise ArgumentError, "Kafka messaging implementation does not take a block" if block_given?
          raw_publish(*queue_for_publish(options)).wait
        end

        def publish_messages_impl(messages)
          handles = messages.collect { |msg_options| raw_publish(*queue_for_publish(msg_options)) }
          handles.each(&:wait)
        end

        def subscribe_messages_impl(options, &block)
          wait = options.delete(:wait_for_topic)
          wait = true if wait.nil?

          topic = address(options)
          options[:persist_ref] = GROUP_FOR_QUEUE_MESSAGES + topic

          wait_for_topic(wait) do
            queue_consumer = consumer(true, options)
            queue_consumer.subscribe(topic)
            queue_consumer.each do |message|
              process_queue_message(queue_consumer, topic, message, &block)
            end
          end
        end
      end
    end
  end
end

Version data entries

7 entries across 7 versions & 1 rubygems

Version Path
manageiq-messaging-1.5.0 lib/manageiq/messaging/kafka/queue.rb
manageiq-messaging-1.4.3 lib/manageiq/messaging/kafka/queue.rb
manageiq-messaging-1.4.2 lib/manageiq/messaging/kafka/queue.rb
manageiq-messaging-1.4.1 lib/manageiq/messaging/kafka/queue.rb
manageiq-messaging-1.4.0 lib/manageiq/messaging/kafka/queue.rb
manageiq-messaging-1.3.0 lib/manageiq/messaging/kafka/queue.rb
manageiq-messaging-1.2.0 lib/manageiq/messaging/kafka/queue.rb