spec/inputs/rabbitmq_spec.rb in logstash-input-rabbitmq-3.1.5 vs spec/inputs/rabbitmq_spec.rb in logstash-input-rabbitmq-3.2.0
- old
+ new
@@ -110,11 +110,11 @@
end
end
describe "with a live server", :integration => true do
let(:klass) { LogStash::Inputs::RabbitMQ }
- let(:config) { {"host" => "127.0.0.1"} }
+ let(:config) { {"host" => "127.0.0.1", "auto_delete" => true, "codec" => "plain" } }
let(:instance) { klass.new(config) }
let(:hare_info) { instance.instance_variable_get(:@hare_info) }
let(:output_queue) { Queue.new }
# Spawn a connection in the bg and wait up (n) seconds
@@ -132,11 +132,11 @@
end
# Extra time to make sure the consumer can attach
# Without this there's a chance the shutdown code will execute
# before consumption begins. This is tricky to do more elegantly
- sleep 1
+ sleep 4
end
let(:test_connection) { MarchHare.connect(instance.send(:rabbitmq_settings)) }
let(:test_channel) { test_connection.create_channel }
@@ -164,24 +164,88 @@
it "should have the correct prefetch value" do
expect(instance.instance_variable_get(:@hare_info).channel.prefetch).to eql(256)
end
describe "receiving a message with a queue specified" do
- let(:queue_name) { "foo_queue" }
let(:config) { super.merge("queue" => queue_name) }
+ let(:event) { output_queue.pop }
+ let(:queue) { test_channel.queue(queue_name, :auto_delete => true) }
+ let(:queue_name) { "logstash-input-rabbitmq-#{rand(0xFFFFFFFF)}" }
- it "should process the message" do
- message = "Foo Message"
- q = test_channel.queue(queue_name)
- q.publish(message)
+ context "when the message has a payload but no message headers" do
+ before do
+ queue.publish(message)
+ end
- event = output_queue.pop
- expect(event["message"]).to eql(message)
+ let(:message) { "Foo Message" }
+
+ it "should process the message and store the payload" do
+ expect(event["message"]).to eql(message)
+ end
+
+ it "should save an empty message header hash" do
+ expect(event).to include("@metadata")
+ expect(event["@metadata"]).to include("rabbitmq_headers")
+ expect(event["@metadata"]["rabbitmq_headers"]).to eq({})
+ end
end
+
+ context "when message properties are available" do
+ before do
+ # Don't test every single property but select a few with
+ # different characteristics to get sufficient coverage.
+ queue.publish("",
+ :properties => {
+ :app_id => app_id,
+ :timestamp => Java::JavaUtil::Date.new(epoch * 1000),
+ :priority => priority,
+ })
+ end
+
+ let(:app_id) { "myapplication" }
+ # Randomize the epoch we test with but limit its range to signed
+ # ints to not assume all protocols and libraries involved use
+ # unsigned ints for epoch values.
+ let(:epoch) { rand(0x7FFFFFFF) }
+ let(:priority) { 5 }
+
+ it "should save message properties into a @metadata field" do
+ expect(event).to include("@metadata")
+ expect(event["@metadata"]).to include("rabbitmq_properties")
+
+ props = event["@metadata"]["rabbitmq_properties"]
+ expect(props["app-id"]).to eq(app_id)
+ expect(props["delivery-mode"]).to eq(1)
+ expect(props["exchange"]).to eq("")
+ expect(props["priority"]).to eq(priority)
+ expect(props["routing-key"]).to eq(queue_name)
+ expect(props["timestamp"]).to eq(epoch)
+ end
+ end
+
+ context "when message headers are available" do
+ before do
+ queue.publish("", :properties => { :headers => headers })
+ end
+
+ let (:headers) {
+ {
+ "arrayvalue" => [true, 123, "foo"],
+ "boolvalue" => true,
+ "intvalue" => 123,
+ "stringvalue" => "foo",
+ }
+ }
+
+ it "should save message headers into a @metadata field" do
+ expect(event).to include("@metadata")
+ expect(event["@metadata"]).to include("rabbitmq_headers")
+ expect(event["@metadata"]["rabbitmq_headers"]).to include(headers)
+ end
+ end
end
describe LogStash::Inputs::RabbitMQ do
- let(:config) { super.merge("queue" => "foo_queue") }
it_behaves_like "an interruptible input plugin" do
end
end
end