require "spec_helper" require "set" describe Bunny::Queue, "#subscribe" do let(:connection) do c = Bunny.new(username: "bunny_gem", password: "bunny_password", vhost: "bunny_testbed") c.start c end after :each do connection.close if connection.open? end context "with automatic acknowledgement mode" do let(:queue_name) { "bunny.basic_consume#{rand}" } it "registers the consumer" do delivered_keys = [] delivered_data = [] t = Thread.new do ch = connection.create_channel q = ch.queue(queue_name, auto_delete: true, durable: false) q.subscribe(exclusive: false, manual_ack: false) do |delivery_info, properties, payload| delivered_keys << delivery_info.routing_key delivered_data << payload end end t.abort_on_exception = true sleep 0.5 ch = connection.create_channel x = ch.default_exchange x.publish("hello", routing_key: queue_name) sleep 0.7 expect(delivered_keys).to include(queue_name) expect(delivered_data).to include("hello") expect(ch.queue(queue_name, auto_delete: true, durable: false).message_count).to eq 0 ch.close end context "with a single consumer" do let(:queue_name) { "bunny.basic_consume#{rand}" } it "provides delivery tag access" do delivery_tags = SortedSet.new cch = connection.create_channel q = cch.queue(queue_name, auto_delete: true, durable: false) q.subscribe(exclusive: false, manual_ack: false) do |delivery_info, properties, payload| delivery_tags << delivery_info.delivery_tag end sleep 0.5 ch = connection.create_channel x = ch.default_exchange 100.times do x.publish("hello", routing_key: queue_name) end sleep 1.5 100.times do |i| expect(delivery_tags).to include(i + 1) end expect(ch.queue(queue_name, auto_delete: true, durable: false).message_count).to eq 0 ch.close end end context "with multiple consumers on the same channel" do let(:queue_name) { "bunny.basic_consume#{rand}" } it "provides delivery tag access" do delivery_tags = SortedSet.new cch = connection.create_channel q = cch.queue(queue_name, auto_delete: true, durable: false) 7.times do q.subscribe(exclusive: false, manual_ack: false) do |delivery_info, properties, payload| delivery_tags << delivery_info.delivery_tag end end sleep 1.0 ch = connection.create_channel x = ch.default_exchange 100.times do x.publish("hello", routing_key: queue_name) end sleep 1.5 100.times do |i| expect(delivery_tags).to include(i + 1) end expect(ch.queue(queue_name, auto_delete: true, durable: false).message_count).to eq 0 ch.close end end end context "with manual acknowledgement mode" do let(:queue_name) { "bunny.basic_consume#{rand}" } it "register a consumer with manual acknowledgements mode" do delivered_keys = [] delivered_data = [] t = Thread.new do ch = connection.create_channel q = ch.queue(queue_name, auto_delete: true, durable: false) q.subscribe(exclusive: false, manual_ack: true) do |delivery_info, properties, payload| delivered_keys << delivery_info.routing_key delivered_data << payload ch.ack(delivery_info.delivery_tag) end end t.abort_on_exception = true sleep 0.5 ch = connection.create_channel x = ch.default_exchange x.publish("hello", routing_key: queue_name) sleep 0.7 expect(delivered_keys).to include(queue_name) expect(delivered_data).to include("hello") expect(ch.queue(queue_name, auto_delete: true, durable: false).message_count).to eq 0 ch.close end end ENV.fetch("RUNS", 20).to_i.times do |i| context "with a queue that already has messages (take #{i})" do let(:queue_name) { "bunny.basic_consume#{rand}" } it "registers the consumer" do delivered_keys = [] delivered_data = [] ch = connection.create_channel q = ch.queue(queue_name, auto_delete: true, durable: false) x = ch.default_exchange 100.times do x.publish("hello", routing_key: queue_name) end sleep 0.7 expect(q.message_count).to be > 50 t = Thread.new do ch = connection.create_channel q = ch.queue(queue_name, auto_delete: true, durable: false) q.subscribe(exclusive: false, manual_ack: false) do |delivery_info, properties, payload| delivered_keys << delivery_info.routing_key delivered_data << payload end end t.abort_on_exception = true sleep 0.5 expect(delivered_keys).to include(queue_name) expect(delivered_data).to include("hello") expect(ch.queue(queue_name, auto_delete: true, durable: false).message_count).to eq 0 ch.close end end end # 20.times context "after consumer pool has already been shut down" do let(:queue_name) { "bunny.basic_consume#{rand}" } it "registers the consumer" do delivered_keys = [] delivered_data = [] t = Thread.new do ch = connection.create_channel q = ch.queue(queue_name) c1 = q.subscribe(exclusive: false, manual_ack: false) do |delivery_info, properties, payload| end c1.cancel c2 = q.subscribe(exclusive: false, manual_ack: false) do |delivery_info, properties, payload| delivered_keys << delivery_info.routing_key delivered_data << payload end c2.cancel q.subscribe(exclusive: false, manual_ack: false) do |delivery_info, properties, payload| delivered_keys << delivery_info.routing_key delivered_data << payload end end t.abort_on_exception = true sleep 0.5 ch = connection.create_channel x = ch.default_exchange x.publish("hello", routing_key: queue_name) sleep 0.7 expect(delivered_keys).to include(queue_name) expect(delivered_data).to include("hello") expect(ch.queue(queue_name).message_count).to eq 0 ch.queue_delete(queue_name) ch.close end end context "with uncaught exceptions in delivery handler" do context "and defined exception handler" do let(:queue_name) { "bunny.basic_consume#{rand}" } it "uses exception handler" do caught = nil t = Thread.new do ch = connection.create_channel q = ch.queue(queue_name, auto_delete: true, durable: false) ch.on_uncaught_exception do |e, consumer| caught = e end q.subscribe(exclusive: false, manual_ack: false) do |delivery_info, properties, payload| raise RuntimeError.new(queue_name) end end t.abort_on_exception = true sleep 0.5 ch = connection.create_channel x = ch.default_exchange x.publish("hello", routing_key: queue_name) sleep 0.5 expect(caught.message).to eq queue_name ch.close end end context "and default exception handler" do let(:queue_name) { "bunny.basic_consume#{rand}" } it "uses exception handler" do caughts = [] t = Thread.new do allow(connection.logger).to receive(:error) { |x| caughts << x } ch = connection.create_channel q = ch.queue(queue_name, auto_delete: true, durable: false) q.subscribe(exclusive: false, manual_ack: false) do |delivery_info, properties, payload| raise RuntimeError.new(queue_name) end end t.abort_on_exception = true sleep 0.5 ch = connection.create_channel x = ch.default_exchange 5.times { x.publish("hello", routing_key: queue_name) } sleep 1.5 expect(caughts.size).to eq(5) ch.close end end context "with a single consumer" do let(:queue_name) { "bunny.basic_consume#{rand}" } it "provides delivery tag access" do delivery_tags = SortedSet.new cch = connection.create_channel q = cch.queue(queue_name, auto_delete: true, durable: false) q.subscribe(exclusive: false, manual_ack: false) do |delivery_info, properties, payload| delivery_tags << delivery_info.delivery_tag end sleep 0.5 ch = connection.create_channel x = ch.default_exchange 100.times do x.publish("hello", routing_key: queue_name) end sleep 1.5 100.times do |i| expect(delivery_tags).to include(i + 1) end expect(ch.queue(queue_name, auto_delete: true, durable: false).message_count).to eq 0 ch.close end end context "with multiple consumers on the same channel" do let(:queue_name) { "bunny.basic_consume#{rand}" } it "provides delivery tag access" do delivery_tags = SortedSet.new cch = connection.create_channel q = cch.queue(queue_name, auto_delete: true, durable: false) 7.times do q.subscribe(exclusive: false, manual_ack: false) do |delivery_info, properties, payload| delivery_tags << delivery_info.delivery_tag end end sleep 1.0 ch = connection.create_channel x = ch.default_exchange 100.times do x.publish("hello", routing_key: queue_name) end sleep 1.5 100.times do |i| expect(delivery_tags).to include(i + 1) end expect(ch.queue(queue_name, auto_delete: true, durable: false).message_count).to eq 0 ch.close end end end end # describe