spec/integration/eventmachine/basic_consume_spec.rb in amq-client-0.7.0.alpha35 vs spec/integration/eventmachine/basic_consume_spec.rb in amq-client-0.8.0

- old
+ new

@@ -1,16 +1,18 @@ +# encoding: utf-8 + require 'spec_helper' require 'integration/eventmachine/spec_helper' # # Note that this spec doesn't test acknowledgements. # See basic_ack_spec for example with acks # -describe AMQ::Client::EventMachineClient, "Basic.Consume" do +describe AMQ::Client::EventMachineClient, "AMQP consumer" do include EventedSpec::SpecHelper - default_timeout 4 + default_timeout 5 context "sending 100 messages" do let(:messages) { (0..99).map {|i| "Message #{i}" } } it "should receive all the messages" do @@ -30,11 +32,11 @@ messages.each do |message| exchange.publish(message) end end - done(1.5) { + done(3.5) { @received_messages.should =~ messages } end end end # it "should receive all the messages" @@ -60,12 +62,12 @@ exchange.publish(message) end end # We're opening another channel and starting consuming the same queue, - # assuming 1.5 secs was enough to receive all the messages - delayed(1.5) do + # assuming 2.0 secs was enough to receive all the messages + delayed(2.0) do other_channel = AMQ::Client::Channel.new(client, 2) other_channel.open do other_channel.qos(0, 1) # Maximum prefetch size = 1 other_queue = AMQ::Client::Queue.new(client, other_channel, queue.name) other_queue.consume(true) do |amq_method| @@ -77,13 +79,95 @@ end end - done(2.5) { + done(4.5) { @rereceived_messages.should == [] @received_messages.should =~ messages } end end # it "should not leave messages in the queues with noack=true" end # context "sending 100 messages" end # describe AMQ::Client::EventMachineClient, "Basic.Consume" + + + + +describe "Multiple", AMQ::Client::Async::Consumer do + include EventedSpec::SpecHelper + default_timeout 4 + + context "sharing the same queue with equal prefetch levels" do + let(:messages) { (0..99).map {|i| "Message #{i}" } } + + it "have messages distributed to them in the round-robin manner" do + @consumer1_mailbox = [] + @consumer2_mailbox = [] + @consumer3_mailbox = [] + + em_amqp_connect do |client| + channel = AMQ::Client::Channel.new(client, 1) + channel.open do + queue = AMQ::Client::Queue.new(client, channel).declare(false, false, false, true) + queue.bind("amq.fanout") + + consumer1 = AMQ::Client::Async::Consumer.new(channel, queue, "#{queue.name}-consumer-#{Time.now}") + consumer2 = AMQ::Client::Async::Consumer.new(channel, queue) + consumer3 = AMQ::Client::Async::Consumer.new(channel, queue, "#{queue.name}-consumer-#{rand}-#{Time.now}", false, true) + + + consumer1.consume.on_delivery do |method, header, payload| + @consumer1_mailbox << payload + end + + consumer2.consume(true).on_delivery do |method, header, payload| + @consumer2_mailbox << payload + end + + consumer3.consume(false) do + puts "Consumer 3 is ready" + end + consumer3.on_delivery do |method, header, payload| + @consumer3_mailbox << payload + end + + + exchange = AMQ::Client::Exchange.new(client, channel, "amq.fanout", :fanout) + messages.each do |message| + exchange.publish(message) + end + end + + done(1.5) { + @consumer1_mailbox.size.should == 34 + @consumer2_mailbox.size.should == 33 + @consumer3_mailbox.size.should == 33 + } + end + end # it + end # context +end # describe + + + +describe AMQ::Client::Async::Consumer do + include EventedSpec::SpecHelper + default_timeout 4 + + context "registered with a callback" do + it "runs that callback when basic.consume-ok arrives" do + em_amqp_connect do |client| + channel = AMQ::Client::Channel.new(client, 1) + channel.open do + queue = AMQ::Client::Queue.new(client, channel).declare(false, false, false, true) + queue.bind("amq.fanout") + + consumer1 = AMQ::Client::Async::Consumer.new(channel, queue, "#{queue.name}-consumer-#{Time.now}") + consumer1.consume do + done + end # consume do + end # open do + end # em_amqp_connect + end # it + end # context +end # describe