Sha256: 4b7fade3bc9f538ced987661efdd586992beb9ffb1bd335f3e7d25f5fb88945c
Contents?: true
Size: 1.01 KB
Versions: 4
Compression:
Stored size: 1.01 KB
Contents
module ManageIQ module Messaging module Kafka module Queue GROUP_FOR_QUEUE_MESSAGES = ENV['QUEUE_MESSAGES_GROUP_PREFIX'].freeze || 'manageiq_messaging_queue_group_'.freeze private def publish_message_impl(options) raise ArgumentError, "Kafka messaging implementation does not take a block" if block_given? raw_publish(*queue_for_publish(options)).wait end def publish_messages_impl(messages) handles = messages.collect { |msg_options| raw_publish(*queue_for_publish(msg_options)) } handles.each(&:wait) end def subscribe_messages_impl(options, &block) topic = address(options) options[:persist_ref] = GROUP_FOR_QUEUE_MESSAGES + topic queue_consumer = consumer(true, options) queue_consumer.subscribe(topic) queue_consumer.each do |message| process_queue_message(queue_consumer, topic, message, &block) end end end end end end
Version data entries
4 entries across 4 versions & 1 rubygems