require 'spec_helper' describe ::LogStash::Plugins::Builtin::Pipeline do let(:address) { "fooAdr" } let(:input_options) { { "address" => address }} let(:output_options) { { "send_to" => [address] }} let(:execution_context) { double("execution_context" )} let(:agent) { double("agent") } let(:pipeline_bus) { org.logstash.plugins.pipeline.PipelineBus.new } let(:queue) { Queue.new } let(:input) { ::LogStash::Plugins::Builtin::Pipeline::Input.new(input_options) } let(:output) { ::LogStash::Plugins::Builtin::Pipeline::Output.new(output_options) } let(:inputs) { [input] } let(:event) { ::LogStash::Event.new("foo" => "bar") } before(:each) do allow(execution_context).to receive(:agent).and_return(agent) allow(agent).to receive(:pipeline_bus).and_return(pipeline_bus) inputs.each do |i| allow(i).to receive(:execution_context).and_return(execution_context) end allow(output).to receive(:execution_context).and_return(execution_context) end def wait_input_running(input_plugin) until input_plugin.running? sleep 0.1 end end describe "Input/output pair" do def start_input input.register @input_thread = Thread.new do input.run(queue) end wait_input_running(input) end def stop_input input.do_stop input.do_close @input_thread.join end context "with both initially running" do before(:each) do start_input output.register end describe "sending a message" do before(:each) do output.multi_receive([event]) end subject { queue.pop(true) } it "should not send an object with the same identity, but rather, a clone" do expect(subject).not_to equal(event) end it "should send a clone with the correct data" do expect(subject.to_hash_with_metadata).to match(event.to_hash_with_metadata) end # A clone wouldn't be affected here it "should no longer have the same content if the original event was modified" do event.set("baz", "bot") expect(subject.to_hash_with_metadata).not_to match(event.to_hash_with_metadata) end end after(:each) do stop_input output.do_close end end context "with the input initially stopped" do before(:each) do output.register @receive_thread = Thread.new { output.multi_receive([event]) } end it "should deliver the message once the input comes up" do sleep 3 start_input @receive_thread.join expect(queue.pop(true).to_hash_with_metadata).to match(event.to_hash_with_metadata) end after(:each) do stop_input output.do_close end end it "stopped input should process events until upstream outputs stop" do start_input output.register pipeline_bus.setBlockOnUnlisten(true) output.multi_receive([event]) expect(queue.pop(true).to_hash_with_metadata).to match(event.to_hash_with_metadata) Thread.new { input.do_stop } sleep 1 output.multi_receive([event]) expect(queue.pop(true).to_hash_with_metadata).to match(event.to_hash_with_metadata) output.do_close end end describe "one output to multiple inputs" do describe "with all plugins up" do let(:other_address) { "other" } let(:other_input_options) { { "address" => other_address } } let(:other_input) { ::LogStash::Plugins::Builtin::Pipeline::Input.new(other_input_options) } let(:output_options) { { "send_to" => [address, other_address] } } let(:inputs) { [input, other_input] } let(:queues) { [Queue.new, Queue.new] } let(:inputs_queues) { Hash[inputs.zip(queues)] } before(:each) do input.register other_input.register output.register @input_threads = inputs_queues.map do |input_plugin,input_queue| Thread.new do input_plugin.run(input_queue) end end inputs_queues.each do |input_plugin, input_queue| wait_input_running(input_plugin) end end describe "sending a message" do before(:each) do output.multi_receive([event]) end it "should send the message to both outputs" do inputs_queues.each do |i,q| expect(q.pop(true).to_hash_with_metadata).to match(event.to_hash_with_metadata) end end end context "with ensure delivery set to false" do let(:output_options) { super.merge("ensure_delivery" => false) } before(:each) do other_input.do_stop other_input.do_close output.multi_receive([event]) end it "should not send the event to the input that is down" do expect(inputs_queues[input].pop(true).to_hash_with_metadata).to match(event.to_hash_with_metadata) expect(inputs_queues[other_input].size).to eql(0) end # Test that the function isn't blocked on the last message # a bug could conceivable cause this to hang it "should not block subsequent sends" do output.multi_receive([event]) expect(inputs_queues[input].pop(true).to_hash_with_metadata).to match(event.to_hash_with_metadata) expect(inputs_queues[input].pop(true).to_hash_with_metadata).to match(event.to_hash_with_metadata) expect(inputs_queues[other_input].size).to eql(0) end end after(:each) do inputs.each(&:do_stop) inputs.each(&:do_close) output.do_close @input_threads.each(&:join) end end end end