spec/integration/eventmachine/basic_consume_spec.rb in amq-client-0.7.0.alpha35 vs spec/integration/eventmachine/basic_consume_spec.rb in amq-client-0.8.0
- old
+ new
@@ -1,16 +1,18 @@
+# encoding: utf-8
+
require 'spec_helper'
require 'integration/eventmachine/spec_helper'
#
# Note that this spec doesn't test acknowledgements.
# See basic_ack_spec for example with acks
#
-describe AMQ::Client::EventMachineClient, "Basic.Consume" do
+describe AMQ::Client::EventMachineClient, "AMQP consumer" do
include EventedSpec::SpecHelper
- default_timeout 4
+ default_timeout 5
context "sending 100 messages" do
let(:messages) { (0..99).map {|i| "Message #{i}" } }
it "should receive all the messages" do
@@ -30,11 +32,11 @@
messages.each do |message|
exchange.publish(message)
end
end
- done(1.5) {
+ done(3.5) {
@received_messages.should =~ messages
}
end
end
end # it "should receive all the messages"
@@ -60,12 +62,12 @@
exchange.publish(message)
end
end
# We're opening another channel and starting consuming the same queue,
- # assuming 1.5 secs was enough to receive all the messages
- delayed(1.5) do
+ # assuming 2.0 secs was enough to receive all the messages
+ delayed(2.0) do
other_channel = AMQ::Client::Channel.new(client, 2)
other_channel.open do
other_channel.qos(0, 1) # Maximum prefetch size = 1
other_queue = AMQ::Client::Queue.new(client, other_channel, queue.name)
other_queue.consume(true) do |amq_method|
@@ -77,13 +79,95 @@
end
end
- done(2.5) {
+ done(4.5) {
@rereceived_messages.should == []
@received_messages.should =~ messages
}
end
end # it "should not leave messages in the queues with noack=true"
end # context "sending 100 messages"
end # describe AMQ::Client::EventMachineClient, "Basic.Consume"
+
+
+
+
+describe "Multiple", AMQ::Client::Async::Consumer do
+ include EventedSpec::SpecHelper
+ default_timeout 4
+
+ context "sharing the same queue with equal prefetch levels" do
+ let(:messages) { (0..99).map {|i| "Message #{i}" } }
+
+ it "have messages distributed to them in the round-robin manner" do
+ @consumer1_mailbox = []
+ @consumer2_mailbox = []
+ @consumer3_mailbox = []
+
+ em_amqp_connect do |client|
+ channel = AMQ::Client::Channel.new(client, 1)
+ channel.open do
+ queue = AMQ::Client::Queue.new(client, channel).declare(false, false, false, true)
+ queue.bind("amq.fanout")
+
+ consumer1 = AMQ::Client::Async::Consumer.new(channel, queue, "#{queue.name}-consumer-#{Time.now}")
+ consumer2 = AMQ::Client::Async::Consumer.new(channel, queue)
+ consumer3 = AMQ::Client::Async::Consumer.new(channel, queue, "#{queue.name}-consumer-#{rand}-#{Time.now}", false, true)
+
+
+ consumer1.consume.on_delivery do |method, header, payload|
+ @consumer1_mailbox << payload
+ end
+
+ consumer2.consume(true).on_delivery do |method, header, payload|
+ @consumer2_mailbox << payload
+ end
+
+ consumer3.consume(false) do
+ puts "Consumer 3 is ready"
+ end
+ consumer3.on_delivery do |method, header, payload|
+ @consumer3_mailbox << payload
+ end
+
+
+ exchange = AMQ::Client::Exchange.new(client, channel, "amq.fanout", :fanout)
+ messages.each do |message|
+ exchange.publish(message)
+ end
+ end
+
+ done(1.5) {
+ @consumer1_mailbox.size.should == 34
+ @consumer2_mailbox.size.should == 33
+ @consumer3_mailbox.size.should == 33
+ }
+ end
+ end # it
+ end # context
+end # describe
+
+
+
+describe AMQ::Client::Async::Consumer do
+ include EventedSpec::SpecHelper
+ default_timeout 4
+
+ context "registered with a callback" do
+ it "runs that callback when basic.consume-ok arrives" do
+ em_amqp_connect do |client|
+ channel = AMQ::Client::Channel.new(client, 1)
+ channel.open do
+ queue = AMQ::Client::Queue.new(client, channel).declare(false, false, false, true)
+ queue.bind("amq.fanout")
+
+ consumer1 = AMQ::Client::Async::Consumer.new(channel, queue, "#{queue.name}-consumer-#{Time.now}")
+ consumer1.consume do
+ done
+ end # consume do
+ end # open do
+ end # em_amqp_connect
+ end # it
+ end # context
+end # describe