spec/inputs/rabbitmq_spec.rb in logstash-input-rabbitmq-3.2.0 vs spec/inputs/rabbitmq_spec.rb in logstash-input-rabbitmq-3.3.0
- old
+ new
@@ -1,9 +1,10 @@
# encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require "logstash/inputs/rabbitmq"
require "thread"
+require 'logstash/event'
Thread.abort_on_exception = true
describe LogStash::Inputs::RabbitMQ do
let(:klass) { LogStash::Inputs::RabbitMQ }
@@ -70,15 +71,28 @@
context "with an exchange declared" do
let(:exchange) { "exchange" }
let(:key) { "routing key" }
let(:rabbitmq_settings) { super.merge("exchange" => exchange, "key" => key) }
- it "should bind to the exchange" do
- instance.register
- expect(queue).to have_received(:bind).with(exchange, :routing_key => key)
+ before do
+ allow(instance).to receive(:declare_exchange!)
end
+ context "on register" do
+ before do
+ instance.register
+ end
+
+ it "should bind to the exchange" do
+ expect(queue).to have_received(:bind).with(exchange, :routing_key => key)
+ end
+
+ it "should declare the exchange" do
+ expect(instance).to have_received(:declare_exchange!)
+ end
+ end
+
context "but not immediately available" do
before do
i = 0
allow(queue).to receive(:bind).with(any_args) do
i += 1
@@ -163,19 +177,25 @@
it "should have the correct prefetch value" do
expect(instance.instance_variable_get(:@hare_info).channel.prefetch).to eql(256)
end
- describe "receiving a message with a queue specified" do
- let(:config) { super.merge("queue" => queue_name) }
+ describe "receiving a message with a queue + exchange specified" do
+ let(:config) { super.merge("queue" => queue_name, "exchange" => exchange_name, "exchange_type" => "fanout") }
let(:event) { output_queue.pop }
- let(:queue) { test_channel.queue(queue_name, :auto_delete => true) }
+ let(:exchange) { test_channel.exchange(exchange_name, :type => "fanout") }
+ let(:exchange_name) { "logstash-input-rabbitmq-#{rand(0xFFFFFFFF)}" }
+ #let(:queue) { test_channel.queue(queue_name, :auto_delete => true) }
let(:queue_name) { "logstash-input-rabbitmq-#{rand(0xFFFFFFFF)}" }
+ after do
+ exchange.delete
+ end
+
context "when the message has a payload but no message headers" do
before do
- queue.publish(message)
+ exchange.publish(message)
end
let(:message) { "Foo Message" }
it "should process the message and store the payload" do
@@ -191,11 +211,11 @@
context "when message properties are available" do
before do
# Don't test every single property but select a few with
# different characteristics to get sufficient coverage.
- queue.publish("",
+ exchange.publish("",
:properties => {
:app_id => app_id,
:timestamp => Java::JavaUtil::Date.new(epoch * 1000),
:priority => priority,
})
@@ -213,20 +233,20 @@
expect(event["@metadata"]).to include("rabbitmq_properties")
props = event["@metadata"]["rabbitmq_properties"]
expect(props["app-id"]).to eq(app_id)
expect(props["delivery-mode"]).to eq(1)
- expect(props["exchange"]).to eq("")
+ expect(props["exchange"]).to eq(exchange_name)
expect(props["priority"]).to eq(priority)
- expect(props["routing-key"]).to eq(queue_name)
+ expect(props["routing-key"]).to eq("")
expect(props["timestamp"]).to eq(epoch)
end
end
context "when message headers are available" do
before do
- queue.publish("", :properties => { :headers => headers })
+ exchange.publish("", :properties => { :headers => headers })
end
let (:headers) {
{
"arrayvalue" => [true, 123, "foo"],
@@ -243,10 +263,8 @@
end
end
end
describe LogStash::Inputs::RabbitMQ do
- it_behaves_like "an interruptible input plugin" do
-
- end
+ it_behaves_like "an interruptible input plugin"
end
end