spec/inputs/rabbitmq_spec.rb in logstash-input-rabbitmq-3.1.5 vs spec/inputs/rabbitmq_spec.rb in logstash-input-rabbitmq-3.2.0

- old
+ new

@@ -110,11 +110,11 @@ end end describe "with a live server", :integration => true do let(:klass) { LogStash::Inputs::RabbitMQ } - let(:config) { {"host" => "127.0.0.1"} } + let(:config) { {"host" => "127.0.0.1", "auto_delete" => true, "codec" => "plain" } } let(:instance) { klass.new(config) } let(:hare_info) { instance.instance_variable_get(:@hare_info) } let(:output_queue) { Queue.new } # Spawn a connection in the bg and wait up (n) seconds @@ -132,11 +132,11 @@ end # Extra time to make sure the consumer can attach # Without this there's a chance the shutdown code will execute # before consumption begins. This is tricky to do more elegantly - sleep 1 + sleep 4 end let(:test_connection) { MarchHare.connect(instance.send(:rabbitmq_settings)) } let(:test_channel) { test_connection.create_channel } @@ -164,24 +164,88 @@ it "should have the correct prefetch value" do expect(instance.instance_variable_get(:@hare_info).channel.prefetch).to eql(256) end describe "receiving a message with a queue specified" do - let(:queue_name) { "foo_queue" } let(:config) { super.merge("queue" => queue_name) } + let(:event) { output_queue.pop } + let(:queue) { test_channel.queue(queue_name, :auto_delete => true) } + let(:queue_name) { "logstash-input-rabbitmq-#{rand(0xFFFFFFFF)}" } - it "should process the message" do - message = "Foo Message" - q = test_channel.queue(queue_name) - q.publish(message) + context "when the message has a payload but no message headers" do + before do + queue.publish(message) + end - event = output_queue.pop - expect(event["message"]).to eql(message) + let(:message) { "Foo Message" } + + it "should process the message and store the payload" do + expect(event["message"]).to eql(message) + end + + it "should save an empty message header hash" do + expect(event).to include("@metadata") + expect(event["@metadata"]).to include("rabbitmq_headers") + expect(event["@metadata"]["rabbitmq_headers"]).to eq({}) + end end + + context "when message properties are available" do + before do + # Don't test every single property but select a few with + # different characteristics to get sufficient coverage. + queue.publish("", + :properties => { + :app_id => app_id, + :timestamp => Java::JavaUtil::Date.new(epoch * 1000), + :priority => priority, + }) + end + + let(:app_id) { "myapplication" } + # Randomize the epoch we test with but limit its range to signed + # ints to not assume all protocols and libraries involved use + # unsigned ints for epoch values. + let(:epoch) { rand(0x7FFFFFFF) } + let(:priority) { 5 } + + it "should save message properties into a @metadata field" do + expect(event).to include("@metadata") + expect(event["@metadata"]).to include("rabbitmq_properties") + + props = event["@metadata"]["rabbitmq_properties"] + expect(props["app-id"]).to eq(app_id) + expect(props["delivery-mode"]).to eq(1) + expect(props["exchange"]).to eq("") + expect(props["priority"]).to eq(priority) + expect(props["routing-key"]).to eq(queue_name) + expect(props["timestamp"]).to eq(epoch) + end + end + + context "when message headers are available" do + before do + queue.publish("", :properties => { :headers => headers }) + end + + let (:headers) { + { + "arrayvalue" => [true, 123, "foo"], + "boolvalue" => true, + "intvalue" => 123, + "stringvalue" => "foo", + } + } + + it "should save message headers into a @metadata field" do + expect(event).to include("@metadata") + expect(event["@metadata"]).to include("rabbitmq_headers") + expect(event["@metadata"]["rabbitmq_headers"]).to include(headers) + end + end end describe LogStash::Inputs::RabbitMQ do - let(:config) { super.merge("queue" => "foo_queue") } it_behaves_like "an interruptible input plugin" do end end end