spec/inputs/rabbitmq_spec.rb in logstash-input-rabbitmq-3.2.0 vs spec/inputs/rabbitmq_spec.rb in logstash-input-rabbitmq-3.3.0

- old
+ new

@@ -1,9 +1,10 @@ # encoding: utf-8 require "logstash/devutils/rspec/spec_helper" require "logstash/inputs/rabbitmq" require "thread" +require 'logstash/event' Thread.abort_on_exception = true describe LogStash::Inputs::RabbitMQ do let(:klass) { LogStash::Inputs::RabbitMQ } @@ -70,15 +71,28 @@ context "with an exchange declared" do let(:exchange) { "exchange" } let(:key) { "routing key" } let(:rabbitmq_settings) { super.merge("exchange" => exchange, "key" => key) } - it "should bind to the exchange" do - instance.register - expect(queue).to have_received(:bind).with(exchange, :routing_key => key) + before do + allow(instance).to receive(:declare_exchange!) end + context "on register" do + before do + instance.register + end + + it "should bind to the exchange" do + expect(queue).to have_received(:bind).with(exchange, :routing_key => key) + end + + it "should declare the exchange" do + expect(instance).to have_received(:declare_exchange!) + end + end + context "but not immediately available" do before do i = 0 allow(queue).to receive(:bind).with(any_args) do i += 1 @@ -163,19 +177,25 @@ it "should have the correct prefetch value" do expect(instance.instance_variable_get(:@hare_info).channel.prefetch).to eql(256) end - describe "receiving a message with a queue specified" do - let(:config) { super.merge("queue" => queue_name) } + describe "receiving a message with a queue + exchange specified" do + let(:config) { super.merge("queue" => queue_name, "exchange" => exchange_name, "exchange_type" => "fanout") } let(:event) { output_queue.pop } - let(:queue) { test_channel.queue(queue_name, :auto_delete => true) } + let(:exchange) { test_channel.exchange(exchange_name, :type => "fanout") } + let(:exchange_name) { "logstash-input-rabbitmq-#{rand(0xFFFFFFFF)}" } + #let(:queue) { test_channel.queue(queue_name, :auto_delete => true) } let(:queue_name) { "logstash-input-rabbitmq-#{rand(0xFFFFFFFF)}" } + after do + exchange.delete + end + context "when the message has a payload but no message headers" do before do - queue.publish(message) + exchange.publish(message) end let(:message) { "Foo Message" } it "should process the message and store the payload" do @@ -191,11 +211,11 @@ context "when message properties are available" do before do # Don't test every single property but select a few with # different characteristics to get sufficient coverage. - queue.publish("", + exchange.publish("", :properties => { :app_id => app_id, :timestamp => Java::JavaUtil::Date.new(epoch * 1000), :priority => priority, }) @@ -213,20 +233,20 @@ expect(event["@metadata"]).to include("rabbitmq_properties") props = event["@metadata"]["rabbitmq_properties"] expect(props["app-id"]).to eq(app_id) expect(props["delivery-mode"]).to eq(1) - expect(props["exchange"]).to eq("") + expect(props["exchange"]).to eq(exchange_name) expect(props["priority"]).to eq(priority) - expect(props["routing-key"]).to eq(queue_name) + expect(props["routing-key"]).to eq("") expect(props["timestamp"]).to eq(epoch) end end context "when message headers are available" do before do - queue.publish("", :properties => { :headers => headers }) + exchange.publish("", :properties => { :headers => headers }) end let (:headers) { { "arrayvalue" => [true, 123, "foo"], @@ -243,10 +263,8 @@ end end end describe LogStash::Inputs::RabbitMQ do - it_behaves_like "an interruptible input plugin" do - - end + it_behaves_like "an interruptible input plugin" end end