Sha256: 973dd99fda57e67e3e02ce194ab9036d36a8d22836ba385b162f18cf5f4cac51

Contents?: true

Size: 1.97 KB

Versions: 3

Compression:

Stored size: 1.97 KB

Contents

module ManageIQ
  module Messaging
    module Stomp
      module Queue
        private

        def publish_message_impl(options, &block)
          address, headers = queue_for_publish(options)
          headers[:sender]         = options[:sender] if options[:sender]
          headers[:message_type]   = options[:message] if options[:message]
          headers[:class_name]     = options[:class_name] if options[:class_name]
          headers[:correlation_id] = Time.now.to_i.to_s if block_given?

          raw_publish(address, options[:payload] || '', headers)

          return unless block_given?

          receive_response(options[:service], headers[:correlation_id], &block)
        end

        def publish_messages_impl(messages)
          messages.each { |msg_options| publish_message(msg_options) }
        end

        def subscribe_messages_impl(options)
          queue_name, headers = queue_for_subscribe(options)

          # for STOMP we can get message one at a time
          subscribe(queue_name, headers) do |msg|
            begin
              ack(msg) if auto_ack?(options)

              sender = msg.headers['sender']
              message_type = msg.headers['message_type']
              message_body = decode_body(msg.headers, msg.body)
              logger.info("Message received: queue(#{queue_name}), msg(#{payload_log(message_body)}), headers(#{msg.headers})")

              result = yield [ManageIQ::Messaging::ReceivedMessage.new(sender, message_type, message_body, msg)]
              logger.info("Message processed")

              correlation_ref = msg.headers['correlation_id']
              if correlation_ref
                result = result.first if result.kind_of?(Array)
                send_response(options[:service], correlation_ref, result)
              end
            rescue => e
              logger.error("Message processing error: #{e.message}")
              logger.error(e.backtrace.join("\n"))
            end
          end
        end
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
manageiq-messaging-0.1.3 lib/manageiq/messaging/stomp/queue.rb
manageiq-messaging-0.1.2 lib/manageiq/messaging/stomp/queue.rb
manageiq-messaging-0.1.1 lib/manageiq/messaging/stomp/queue.rb