require 'eventmachine' require 'amqp' require 'rflow' class RFlow module Components module AMQP # By default will use an exclusive, non-durable, auto-deleting, # randomly named queue for subscribing to topic messages with an # empty-string pattern class Subscriber < RFlow::Component output_port :amqp_port output_port :raw_port attr_accessor :config, :queue_config DEFAULT_CONFIG = { 'server' => '127.0.0.1', 'port' => 5672, 'username' => 'guest', 'password' => 'guest', 'vhost' => '/', 'queue_name' => 'asdf', 'queue_passive' => false, 'queue_durable' => false, 'queue_exclusive' => true, 'queue_auto_delete' => true, 'binding_pattern' => '', } def configure!(config) @config = DEFAULT_CONFIG.merge config @config['port'] = @config['port'].to_i ['durable', 'passive', 'exclusive', 'auto_delete'].each do |opt| @config["queue_#{opt}"] = to_boolean(@config["queue_#{opt}"]) end # Convert the queue parameters into AMQP-friendly sym-keyed # Hash that can be passed directly to underlying AMQP gem # methods @queue_config = @config.each_with_object({}) do |(key, value), result| md = /queue_(.*)/.match(key.to_s) result[md[1].to_sym] = value unless md.nil? end end def run! ::AMQP.connect(:host => @config['server'], :port => @config['port'], :vhost => @config['vhost'], :username => @config['username'], :password => @config['password']) do |conn| @amqp_connection = conn ::AMQP::Channel.new(@amqp_connection) do |channel| @amqp_channel = channel @amqp_exchange = @amqp_channel.topic ::AMQP::Queue.new(@amqp_channel, @config['queue_name'], @queue_config) do |queue| @amqp_queue = queue @amqp_queue.bind(@amqp_exchange, :routing_key => @config['binding_pattern']).subscribe(:ack => true) do |header, payload| RFlow.logger.debug { "#{name}: AMQP message received" } processing_event = RFlow::Message::ProcessingEvent.new(instance_uuid, Time.now.utc).tap do |e| e.completed_at = Time.now.utc end amqp_port.send_message(RFlow::Message.new('RFlow::Message::Data::AMQP::Message').tap do |m| header.to_hash.each {|k,v| m.data.header[k.to_s] = v } m.data.payload = payload m.provenance << processing_event end) # TODO: Optimize out if not connected raw_port.send_message(RFlow::Message.new('RFlow::Message::Data::Raw').tap do |m| m.data.raw = payload m.provenance << processing_event end) header.ack end end end end end def to_boolean(string) case string when /^true$/i, '1', true; true when /^false/i, '0', false; false else raise ArgumentError, "'#{string}' cannot be coerced to a boolean value" end end end end end end