require "spec/helper/all" describe EM::Synchrony::AMQP do it "should yield until connection is ready" do EM.synchrony do connection = EM::Synchrony::AMQP.connect connection.connected?.should be_true EM.stop end end it "should yield until disconnection is complete" do EM.synchrony do connection = EM::Synchrony::AMQP.connect connection.disconnect connection.connected?.should be_false EM.stop end end it "should yield until the channel is created" do EM.synchrony do connection = EM::Synchrony::AMQP.connect channel = EM::Synchrony::AMQP::Channel.new(connection) channel.should be_kind_of(EM::Synchrony::AMQP::Channel) EM.stop end end it "should yield until the queue is created" do EM.synchrony do connection = EM::Synchrony::AMQP.connect channel = EM::Synchrony::AMQP::Channel.new(connection) queue = EM::Synchrony::AMQP::Queue.new(channel, "test.em-synchrony.queue1", :auto_delete => true) EM.stop end end it "should yield when a queue is created from a channel" do EM.synchrony do connection = EM::Synchrony::AMQP.connect channel = EM::Synchrony::AMQP::Channel.new(connection) queue = channel.queue("test.em-synchrony.queue1", :auto_delete => true) EM.stop end end it "should yield until the exchange is created" do EM.synchrony do connection = EM::Synchrony::AMQP.connect channel = EM::Synchrony::AMQP::Channel.new(connection) exchange = EM::Synchrony::AMQP::Exchange.new(channel, :fanout, "test.em-synchrony.exchange") exchange.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange) direct = channel.direct("test.em-synchrony.direct") fanout = channel.fanout("test.em-synchrony.fanout") topic = channel.topic("test.em-synchrony.topic") headers = channel.headers("test.em-synchrony.headers") direct.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange) fanout.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange) topic.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange) headers.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange) EM.stop end end it "should publish and receive messages" do nb_msg = 10 EM.synchrony do connection = EM::Synchrony::AMQP.connect channel = EM::Synchrony::AMQP::Channel.new(connection) ex = EM::Synchrony::AMQP::Exchange.new(channel, :fanout, "test.em-synchrony.fanout") q1 = channel.queue("test.em-synchrony.queues.1", :auto_delete => true) q2 = channel.queue("test.em-synchrony.queues.2", :auto_delete => true) q1.bind(ex) q2.bind(ex) nb_q1, nb_q2 = 0, 0 stop_cb = proc { EM.stop if nb_q1 + nb_q2 == 2 * nb_msg } q1.subscribe(:ack => false) do |meta, msg| msg.should match(/^Bonjour [0-9]+/) nb_q1 += 1 stop_cb.call end q2.subscribe do |meta, msg| msg.should match(/^Bonjour [0-9]+/) nb_q2 += 1 stop_cb.call end Fiber.new do nb_msg.times do |n| ex.publish("Bonjour #{n}") EM::Synchrony.sleep(0.1) end end.resume end end it "should handle several consumers" do nb_msg = 10 EM.synchrony do connection = EM::Synchrony::AMQP.connect channel = EM::Synchrony::AMQP::Channel.new(connection) exchange = EM::Synchrony::AMQP::Exchange.new(channel, :fanout, "test.em-synchrony.consumers.fanout") queue = channel.queue("test.em-synchrony.consumers.queue", :auto_delete => true) queue.bind(exchange) cons1 = EM::Synchrony::AMQP::Consumer.new(channel, queue) cons2 = EM::Synchrony::AMQP::Consumer.new(channel, queue) nb_cons1, nb_cons2 = 0, 0 stop_cb = Proc.new do if nb_cons1 + nb_cons2 == nb_msg nb_cons1.should eq(nb_cons2) EM.stop end end cons1.on_delivery do |meta, msg| msg.should match(/^Bonjour [0-9]+/) nb_cons1 += 1 stop_cb.call end.consume cons2.on_delivery do |meta, msg| msg.should match(/^Bonjour [0-9]+/) nb_cons2 += 1 stop_cb.call end.consume 10.times do |n| exchange.publish("Bonjour #{n}") EM::Synchrony.sleep(0.1) end end end end