spec/amqp_spec.rb in em-synchrony-1.0.1 vs spec/amqp_spec.rb in em-synchrony-1.0.2

- old
+ new

@@ -1,146 +1,146 @@ -require "spec/helper/all" - -describe EM::Synchrony::AMQP do - - it "should yield until connection is ready" do - EM.synchrony do - connection = EM::Synchrony::AMQP.connect - connection.connected?.should be_true - EM.stop - end - end - - it "should yield until disconnection is complete" do - EM.synchrony do - connection = EM::Synchrony::AMQP.connect - connection.disconnect - connection.connected?.should be_false - EM.stop - end - end - - it "should yield until the channel is created" do - EM.synchrony do - connection = EM::Synchrony::AMQP.connect - channel = EM::Synchrony::AMQP::Channel.new(connection) - channel.should be_kind_of(EM::Synchrony::AMQP::Channel) - EM.stop - end - end - - it "should yield until the queue is created" do - EM.synchrony do - connection = EM::Synchrony::AMQP.connect - channel = EM::Synchrony::AMQP::Channel.new(connection) - queue = EM::Synchrony::AMQP::Queue.new(channel, "test.em-synchrony.queue1", :auto_delete => true) - EM.stop - end - end - - it "should yield when a queue is created from a channel" do - EM.synchrony do - connection = EM::Synchrony::AMQP.connect - channel = EM::Synchrony::AMQP::Channel.new(connection) - queue = channel.queue("test.em-synchrony.queue1", :auto_delete => true) - EM.stop - end - end - - it "should yield until the exchange is created" do - EM.synchrony do - connection = EM::Synchrony::AMQP.connect - channel = EM::Synchrony::AMQP::Channel.new(connection) - - exchange = EM::Synchrony::AMQP::Exchange.new(channel, :fanout, "test.em-synchrony.exchange") - exchange.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange) - - direct = channel.direct("test.em-synchrony.direct") - fanout = channel.fanout("test.em-synchrony.fanout") - topic = channel.topic("test.em-synchrony.topic") - headers = channel.headers("test.em-synchrony.headers") - - direct.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange) - fanout.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange) - topic.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange) - headers.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange) - EM.stop - end - end - - it "should publish and receive messages" do - nb_msg = 10 - EM.synchrony do - connection = EM::Synchrony::AMQP.connect - channel = EM::Synchrony::AMQP::Channel.new(connection) - ex = EM::Synchrony::AMQP::Exchange.new(channel, :fanout, "test.em-synchrony.fanout") - - q1 = channel.queue("test.em-synchrony.queues.1", :auto_delete => true) - q2 = channel.queue("test.em-synchrony.queues.2", :auto_delete => true) - - q1.bind(ex) - q2.bind(ex) - - nb_q1, nb_q2 = 0, 0 - stop_cb = proc { EM.stop if nb_q1 + nb_q2 == 2 * nb_msg } - - q1.subscribe do |meta, msg| - msg.should match(/^Bonjour [0-9]+/) - nb_q1 += 1 - stop_cb.call - end - - q2.subscribe do |meta, msg| - msg.should match(/^Bonjour [0-9]+/) - nb_q2 += 1 - stop_cb.call - end - - Fiber.new do - nb_msg.times do |n| - ex.publish("Bonjour #{n}") - EM::Synchrony.sleep(0.1) - end - end.resume - end - end - - it "should handle several consumers" do - nb_msg = 10 - EM.synchrony do - connection = EM::Synchrony::AMQP.connect - channel = EM::Synchrony::AMQP::Channel.new(connection) - exchange = EM::Synchrony::AMQP::Exchange.new(channel, :fanout, "test.em-synchrony.consumers.fanout") - - queue = channel.queue("test.em-synchrony.consumers.queue", :auto_delete => true) - queue.bind(exchange) - - cons1 = EM::Synchrony::AMQP::Consumer.new(channel, queue) - cons2 = EM::Synchrony::AMQP::Consumer.new(channel, queue) - - nb_cons1, nb_cons2 = 0, 0 - stop_cb = Proc.new do - if nb_cons1 + nb_cons2 == nb_msg - nb_cons1.should eq(nb_cons2) - EM.stop - end - end - - cons1.on_delivery do |meta, msg| - msg.should match(/^Bonjour [0-9]+/) - nb_cons1 += 1 - stop_cb.call - end.consume - - cons2.on_delivery do |meta, msg| - msg.should match(/^Bonjour [0-9]+/) - nb_cons2 += 1 - stop_cb.call - end.consume - - 10.times do |n| - exchange.publish("Bonjour #{n}") - EM::Synchrony.sleep(0.1) - end - end - end -end +require "spec/helper/all" + +describe EM::Synchrony::AMQP do + + it "should yield until connection is ready" do + EM.synchrony do + connection = EM::Synchrony::AMQP.connect + connection.connected?.should be_true + EM.stop + end + end + + it "should yield until disconnection is complete" do + EM.synchrony do + connection = EM::Synchrony::AMQP.connect + connection.disconnect + connection.connected?.should be_false + EM.stop + end + end + + it "should yield until the channel is created" do + EM.synchrony do + connection = EM::Synchrony::AMQP.connect + channel = EM::Synchrony::AMQP::Channel.new(connection) + channel.should be_kind_of(EM::Synchrony::AMQP::Channel) + EM.stop + end + end + + it "should yield until the queue is created" do + EM.synchrony do + connection = EM::Synchrony::AMQP.connect + channel = EM::Synchrony::AMQP::Channel.new(connection) + queue = EM::Synchrony::AMQP::Queue.new(channel, "test.em-synchrony.queue1", :auto_delete => true) + EM.stop + end + end + + it "should yield when a queue is created from a channel" do + EM.synchrony do + connection = EM::Synchrony::AMQP.connect + channel = EM::Synchrony::AMQP::Channel.new(connection) + queue = channel.queue("test.em-synchrony.queue1", :auto_delete => true) + EM.stop + end + end + + it "should yield until the exchange is created" do + EM.synchrony do + connection = EM::Synchrony::AMQP.connect + channel = EM::Synchrony::AMQP::Channel.new(connection) + + exchange = EM::Synchrony::AMQP::Exchange.new(channel, :fanout, "test.em-synchrony.exchange") + exchange.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange) + + direct = channel.direct("test.em-synchrony.direct") + fanout = channel.fanout("test.em-synchrony.fanout") + topic = channel.topic("test.em-synchrony.topic") + headers = channel.headers("test.em-synchrony.headers") + + direct.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange) + fanout.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange) + topic.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange) + headers.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange) + EM.stop + end + end + + it "should publish and receive messages" do + nb_msg = 10 + EM.synchrony do + connection = EM::Synchrony::AMQP.connect + channel = EM::Synchrony::AMQP::Channel.new(connection) + ex = EM::Synchrony::AMQP::Exchange.new(channel, :fanout, "test.em-synchrony.fanout") + + q1 = channel.queue("test.em-synchrony.queues.1", :auto_delete => true) + q2 = channel.queue("test.em-synchrony.queues.2", :auto_delete => true) + + q1.bind(ex) + q2.bind(ex) + + nb_q1, nb_q2 = 0, 0 + stop_cb = proc { EM.stop if nb_q1 + nb_q2 == 2 * nb_msg } + + q1.subscribe do |meta, msg| + msg.should match(/^Bonjour [0-9]+/) + nb_q1 += 1 + stop_cb.call + end + + q2.subscribe do |meta, msg| + msg.should match(/^Bonjour [0-9]+/) + nb_q2 += 1 + stop_cb.call + end + + Fiber.new do + nb_msg.times do |n| + ex.publish("Bonjour #{n}") + EM::Synchrony.sleep(0.1) + end + end.resume + end + end + + it "should handle several consumers" do + nb_msg = 10 + EM.synchrony do + connection = EM::Synchrony::AMQP.connect + channel = EM::Synchrony::AMQP::Channel.new(connection) + exchange = EM::Synchrony::AMQP::Exchange.new(channel, :fanout, "test.em-synchrony.consumers.fanout") + + queue = channel.queue("test.em-synchrony.consumers.queue", :auto_delete => true) + queue.bind(exchange) + + cons1 = EM::Synchrony::AMQP::Consumer.new(channel, queue) + cons2 = EM::Synchrony::AMQP::Consumer.new(channel, queue) + + nb_cons1, nb_cons2 = 0, 0 + stop_cb = Proc.new do + if nb_cons1 + nb_cons2 == nb_msg + nb_cons1.should eq(nb_cons2) + EM.stop + end + end + + cons1.on_delivery do |meta, msg| + msg.should match(/^Bonjour [0-9]+/) + nb_cons1 += 1 + stop_cb.call + end.consume + + cons2.on_delivery do |meta, msg| + msg.should match(/^Bonjour [0-9]+/) + nb_cons2 += 1 + stop_cb.call + end.consume + + 10.times do |n| + exchange.publish("Bonjour #{n}") + EM::Synchrony.sleep(0.1) + end + end + end +end