lib/rosetta_queue/adapters/amqp.rb in bmabey-rosetta_queue-0.1.3 vs lib/rosetta_queue/adapters/amqp.rb in bmabey-rosetta_queue-0.2.0

- old
+ new

@@ -1,166 +1,45 @@ -require 'mq' - -# AMQP -# connections: -# - unlike stomp, we can share one connection across multiple channels -# - set host and authorization options on AMQP.connect -# -# MQ -# create new channel for an AMQP connection -# options for queue and exchange objects include: -# - :durable => true -# - :ack => "client" ???? - module RosettaQueue module Gateway - class AmqpAdapter < BaseAdapter + module Fanout + def fanout_name_for(destination) + fanout_name = destination.gsub(/(topic|fanout)\/(.*)/, '\2') + raise AdapterException, "Unable to discover fanout exchange. Cannot bind queue to exchange!" unless fanout_name + fanout_name + end + end - def initialize(user, pass, host, port=nil) - @user, @pass, @host, @port = user, pass, host, port + class Amqp < BaseAdapter + + def initialize(adapter_settings = {}) + raise AdapterException, "Missing adapter settings" if adapter_settings.empty? + @adapter_settings = adapter_settings end - def disconnect; end + def delete(destination, opts={}) + exchange_strategy_for(destination, opts).delete(destination) + end + def disconnect(message_handler); end + def receive_once(destination, opts={}) - exchange_strategy_for(destination).do_single_exchange(destination, opts) do |msg| + exchange_strategy_for(destination, opts).receive_once(destination) do |msg| return msg end end def receive_with(message_handler) + options = options_for(message_handler) destination = destination_for(message_handler) - exchange_strategy_for(destination).do_exchange(destination, message_handler) + exchange_strategy_for(destination, options).receive(destination, message_handler) end def send_message(destination, message, options=nil) - exchange_strategy_for(destination).publish_to_exchange(destination, message, options) + exchange_strategy_for(destination, options).publish(destination, message) end def unsubscribe; end - - def exchange_strategy_for(destination) - case destination - when /(topic|fanout)/ - @exchange ||= FanoutExchange.new(@user, @pass, @host) - when /queue/ - @exchange ||= DirectExchange.new(@user, @pass, @host) - else - @exchange ||= DirectExchange.new(@user, @pass, @host) - end - end - end - - class BaseExchange - - def initialize(user, pass, host) - @user, @pass, @host = user, pass, host - end - - def publish_to_exchange(destination, message, options={}) - unless EM.reactor_running? - EM.run do - publish_message(destination, message, options) - EM.add_timer(1) { EM.stop_event_loop } - end - else - publish_message(destination, message, options) - end - end - - protected - - def channel - @channel ||= MQ.new(conn) - end - - def conn - # AmqpConnect.connection(@user, @pass, @host) - @conn ||= AMQP.connect(:user => @user, :pass => @pass, :host => @host) - end - - def publish_message(dest, msg, opts) - RosettaQueue.logger.info("Publishing to #{dest} :: #{msg}") - channel.queue(dest).publish(msg, opts) - channel.queue(dest).unsubscribe - end - end - - - class DirectExchange < BaseExchange - - def do_exchange(destination, message_handler) - channel.queue(destination).subscribe do |msg| - RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}") - message_handler.on_message(Filters.process_receiving(msg)) - end - end - - def do_single_exchange(destination, opts={}) - EM.run do - channel.queue(destination).pop do |msg| - RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}") - yield Filters.process_receiving(msg) - end - end - end - end - - - class FanoutExchange < BaseExchange - - def do_exchange(destination, message_handler) - queue = channel.queue("queue_#{self.object_id}") - exchange = channel.fanout(fanout_name_for(destination)) - - queue.bind(exchange).subscribe do |msg| - # channel.queue("queue_#{rand}").bind(channel.fanout(fanout_name_for(destination))).subscribe do |msg| - RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}") - message_handler.on_message(Filters.process_receiving(msg)) - end - end - - def do_single_exchange(destination, opts={}) - EM.run do - queue = channel.queue("queue_#{self.object_id}") - exchange = channel.fanout(fanout_name_for(destination)) - - queue.bind(exchange).pop do |msg| - # channel.queue("queue_#{rand}").bind(channel.fanout(fanout_name_for(destination)), opts).pop do |msg| - RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}") - yield Filters.process_receiving(msg) - end - end - end - - protected - - def publish_message(dest, msg, opts) - exchange = channel.fanout(fanout_name_for(dest)) - exchange.publish(msg, opts) - # channel.fanout(fanout_name_for(dest), :durable => true).publish(msg, opts) - RosettaQueue.logger.info("Publishing to fanout #{dest} :: #{msg}") - end - - private - - def fanout_name_for(destination) - fanout_name = destination.gsub(/(topic|fanout)\/(.*)/, '\2') - raise "Unable to discover fanout exchange. Cannot bind queue to exchange!" unless fanout_name - fanout_name - end - end - - class AmqpConnect - - class << self - - def connection(user, pass, host, port=nil) - @conn ||= AMQP.connect(:user => user, :pass => pass, :host => host) - end - - end end end end