# Copyright (c) 2012 National ICT Australia Limited (NICTA). # This software may be used and distributed solely under the terms of the MIT license (License). # You should find a copy of the License in LICENSE.TXT or at http://opensource.org/licenses/MIT. # By downloading or using this software you accept the terms and the liability disclaimer in the License. require 'omf_common/comm/amqp/amqp_mp' module OmfCommon class Comm class AMQP class Topic < OmfCommon::Comm::Topic def to_s @address end def address @address end # Call 'block' when topic is subscribed to underlying messaging # infrastructure. # def on_subscribed(&block) return unless block call_now = false @lock.synchronize do if @subscribed call_now = true else @on_subscribed_handlers << block end end if call_now after(0, &block) end end private def initialize(id, opts = {}) unless @communicator = opts.delete(:communicator) raise "Missing :communicator option" end super @address = opts[:address] @lock = Monitor.new @subscribed = false @on_subscribed_handlers = [] # @communicator.on_reconnect(self) do # info "Resubscribe '#{self}'" # _init_amqp # end _init_amqp end def _init_amqp() channel = @communicator.channel @exchange = channel.topic(id, :auto_delete => true) # @exchange.on_connection_interruption do |ex| # warn "Exchange #{ex.name} detected connection interruption" # @exchange = nil # end channel.queue("", :exclusive => true, :auto_delete => true) do |queue| #puts "QQ1(#{id}): #{queue}" queue.bind(@exchange) queue.subscribe do |headers, payload| #puts "===(#{id}) Incoming message '#{headers.content_type}'" debug "Received message on #{@address}" MPReceived.inject(Time.now.to_f, @address, payload.to_s[/mid\":\"(.{36})/, 1]) if OmfCommon::Measure.enabled? Message.parse(payload, headers.content_type) do |msg| #puts "---(#{id}) Parsed message '#{msg}'" on_incoming_message(msg) end end debug "Subscribed to '#@id'" # Call all accumulated on_subscribed handlers @lock.synchronize do @subscribed = true @on_subscribed_handlers.each do |block| after(0, &block) end @on_subscribed_handlers = nil end end end def _send_message(msg, block = nil) super content_type, content = msg.marshall(self) debug "(#{id}) Send message (#{content_type}) #{msg.inspect}" if @exchange @exchange.publish(content, content_type: content_type, message_id: msg.mid) MPPublished.inject(Time.now.to_f, @address, msg.mid) if OmfCommon::Measure.enabled? else warn "Unavailable AMQP channel. Dropping message '#{msg}'" end end end # class end # module end # module end # module