# encoding: utf-8 require "spec_helper" describe "Multiple non-exclusive consumers per queue" do # # Environment # include EventedSpec::AMQPSpec include EventedSpec::SpecHelper default_options AMQP_OPTS default_timeout 10 let(:messages) { (0..99).map {|i| "Message #{i}" } } # # Examples # before :each do @consumer1_mailbox = [] @consumer2_mailbox = [] @consumer3_mailbox = [] end context "with equal prefetch levels" do it "have messages distributed to them in the round-robin manner" do channel = AMQP::Channel.new channel.on_error do |ch, channel_close| raise(channel_close.reply_text) end queue = channel.queue("amqpgem.integration.roundrobin.queue1", :auto_delete => true) do consumer1 = AMQP::Consumer.new(channel, queue) consumer2 = AMQP::Consumer.new(channel, queue, "#{queue.name}-consumer-#{rand}-#{Time.now}", false, true) consumer1.consume.on_delivery do |basic_deliver, metadata, payload| @consumer1_mailbox << payload end consumer2.consume(true).on_delivery do |metadata, payload| @consumer2_mailbox << payload end queue.subscribe do |metadata, payload| @consumer3_mailbox << payload end end exchange = channel.default_exchange exchange.on_return do |basic_return, metadata, payload| raise(basic_return.reply_text) end EventMachine.add_timer(1.0) do messages.each do |message| exchange.publish(message, :immediate => true, :mandatory => true, :routing_key => queue.name) end end done(6.5) { @consumer1_mailbox.size.should == 34 @consumer2_mailbox.size.should == 33 @consumer3_mailbox.size.should == 33 } end # it end # context context "with equal prefetch levels and when queue is server-named" do it "have messages distributed to them in the round-robin manner" do channel = AMQP::Channel.new channel.on_error do |ch, channel_close| raise(channel_close.reply_text) end queue = channel.queue("amqpgem.integration.roundrobin.queue1", :auto_delete => true) do consumer1 = AMQP::Consumer.new(channel, queue) consumer2 = AMQP::Consumer.new(channel, queue, "#{queue.name}-consumer-#{rand}-#{Time.now}", false, true) consumer1.consume.on_delivery do |basic_deliver, metadata, payload| @consumer1_mailbox << payload end consumer2.consume(true).on_delivery do |metadata, payload| @consumer2_mailbox << payload end queue.subscribe do |metadata, payload| @consumer3_mailbox << payload end queue.consumer_tag.should == queue.default_consumer.consumer_tag end exchange = channel.default_exchange exchange.on_return do |basic_return, metadata, payload| raise(basic_return.reply_text) end EventMachine.add_timer(1.0) do messages.each do |message| exchange.publish(message, :immediate => true, :mandatory => true, :routing_key => queue.name) end end done(5.0) { @consumer1_mailbox.size.should == 34 @consumer2_mailbox.size.should == 33 @consumer3_mailbox.size.should == 33 } end # it end # context context "with equal prefetch levels and one consumer cancelled mid-flight" do it "have messages distributed to them in the round-robin manner" do channel = AMQP::Channel.new channel.on_error do |ch, channel_close| raise(channel_close.reply_text) end queue = channel.queue("", :auto_delete => true) consumer1 = AMQP::Consumer.new(channel, queue) consumer2 = AMQP::Consumer.new(channel, queue) consumer1.consume.on_delivery do |basic_deliver, metadata, payload| @consumer1_mailbox << payload end consumer2.consume(true).on_delivery do |metadata, payload| @consumer2_mailbox << payload end queue.subscribe do |metadata, payload| @consumer3_mailbox << payload end consumer2.cancel exchange = channel.default_exchange exchange.on_return do |basic_return, metadata, payload| raise(basic_return.reply_text) end EventMachine.add_timer(1.0) do messages.each do |message| exchange.publish(message, :immediate => true, :mandatory => true, :routing_key => queue.name) end end done(5.0) { @consumer1_mailbox.size.should == 50 @consumer2_mailbox.size.should == 0 @consumer3_mailbox.size.should == 50 } end # it end # context context "with equal prefetch levels and two consumers cancelled mid-flight" do it "have messages distributed to the only active consumer" do channel = AMQP::Channel.new channel.on_error do |ch, channel_close| raise(channel_close.reply_text) end queue = channel.queue("amqpgem.integration.roundrobin.queue1", :auto_delete => true) do consumer1 = AMQP::Consumer.new(channel, queue) consumer2 = AMQP::Consumer.new(channel, queue, "#{queue.name}-consumer-#{rand}-#{Time.now}", false, true) consumer1.consume.on_delivery do |basic_deliver, metadata, payload| @consumer1_mailbox << payload end consumer2.consume(true).on_delivery do |metadata, payload| @consumer2_mailbox << payload end queue.subscribe do |metadata, payload| @consumer3_mailbox << payload end queue.unsubscribe consumer2.cancel end exchange = channel.default_exchange exchange.on_return do |basic_return, metadata, payload| raise(basic_return.reply_text) end EventMachine.add_timer(1.0) do messages.each do |message| exchange.publish(message, :immediate => true, :mandatory => true, :routing_key => queue.name) end end done(5.0) { @consumer1_mailbox.size.should == 100 @consumer2_mailbox.size.should == 0 @consumer3_mailbox.size.should == 0 } end # it end # context context "with equal prefetch levels, a server-named queue and two consumers cancelled mid-flight" do it "have messages distributed to the only active consumer" do channel = AMQP::Channel.new channel.on_error do |ch, channel_close| raise(channel_close.reply_text) end queue = channel.queue("", :auto_delete => true) consumer1 = AMQP::Consumer.new(channel, queue) consumer2 = AMQP::Consumer.new(channel, queue) consumer1.consume.on_delivery do |basic_deliver, metadata, payload| @consumer1_mailbox << payload end consumer2.consume(true).on_delivery do |metadata, payload| @consumer2_mailbox << payload end queue.subscribe do |metadata, payload| @consumer3_mailbox << payload end queue.unsubscribe consumer2.cancel exchange = channel.default_exchange exchange.on_return do |basic_return, metadata, payload| raise(basic_return.reply_text) end EventMachine.add_timer(1.0) do messages.each do |message| exchange.publish(message, :immediate => true, :mandatory => true, :routing_key => queue.name) end end done(5.0) { @consumer1_mailbox.size.should == 100 @consumer2_mailbox.size.should == 0 @consumer3_mailbox.size.should == 0 } end # it end # context context "with equal prefetch levels and ALL consumers cancelled mid-flight" do it "returns all immediate messages" do @returned_messages = [] channel = AMQP::Channel.new channel.on_error do |ch, channel_close| raise(channel_close.reply_text) end queue = channel.queue("amqpgem.integration.roundrobin.queue1", :auto_delete => true) do consumer1 = AMQP::Consumer.new(channel, queue) consumer2 = AMQP::Consumer.new(channel, queue, "#{queue.name}-consumer-#{rand}-#{Time.now}", false, true) consumer1.consume.on_delivery do |basic_deliver, metadata, payload| @consumer1_mailbox << payload end consumer2.consume(true).on_delivery do |metadata, payload| @consumer2_mailbox << payload end queue.subscribe do |metadata, payload| @consumer3_mailbox << payload end queue.should be_subscribed queue.unsubscribe queue.should_not be_subscribed consumer2.should be_subscribed consumer2.callback.should_not be_nil consumer2.cancel consumer2.should_not be_subscribed consumer2.callback.should be_nil consumer1.should be_subscribed consumer1.callback.should_not be_nil consumer1.cancel consumer1.should_not be_subscribed consumer1.callback.should be_nil end exchange = channel.default_exchange exchange.on_return do |basic_return, metadata, payload| @returned_messages << payload end EventMachine.add_timer(1.0) do messages.each do |message| exchange.publish(message, :immediate => true, :mandatory => true, :routing_key => queue.name) end end done(6.0) { @returned_messages.size.should == 100 } end # it end # context end # describe