spec/inputs/tcp_spec.rb in logstash-input-tcp-3.0.1 vs spec/inputs/tcp_spec.rb in logstash-input-tcp-3.0.2

- old
+ new

@@ -3,15 +3,18 @@ require "socket" require "timeout" require "logstash/json" require "logstash/inputs/tcp" require "stud/try" +require "stud/task" require "flores/pki" require "openssl" require_relative "../spec_helper" +#Cabin::Channel.get(LogStash).subscribe(STDOUT) +#Cabin::Channel.get(LogStash).level = :debug describe LogStash::Inputs::Tcp do context "codec (PR #1372)" do it "switches from plain to line" do require "logstash/codecs/plain" @@ -234,232 +237,142 @@ after :each do subject.close rescue nil end - describe "register" do + describe "#register" do it "should register without errors" do expect { subject.register }.to_not raise_error end end - describe "receive" do - - let(:nevents) { 10 } - - let(:events) do - socket = Stud::try(5.times) { TCPSocket.new("127.0.0.1", port) } - - result = helper.pipelineless_input(subject, nevents) do - nevents.times do |i| - socket.puts("msg #{i}") - socket.flush - end - end - socket.close rescue nil - - result + describe "#receive" do + shared_examples "receiving events" do + # TODO(sissel): Implement normal event-receipt tests as as a shared example end - before(:each) do - subject.register - end + context "when ssl_enable is true" do + let(:pki) { Flores::PKI.generate } + let(:certificate) { pki[0] } + let(:key) { pki[1] } + let(:certificate_file) { Stud::Temporary.file } + let(:key_file) { Stud::Temporary.file } + let(:queue) { Queue.new } - it "should receive events been generated" do - expect(events.size).to be(nevents) - messages = events.map { |event| event["message"]} - messages.each do |message| - expect(message).to match(/msg \d+/) - end - end - - it "should add the host and port to the generated event" do - events.each do |event| - expect(event["host"]).to eq("127.0.0.1") - expect(event["port"]).to be_an(Fixnum) - end - end - - describe "ssl" do - - let(:certificate) { helper.certificate } - - subject(:input) { LogStash::Plugin.lookup("input", "tcp").new(config) } - let(:config) do { - "host" => "0.0.0.0", + "host" => "127.0.0.1", "port" => port, - "ssl_verify" => false, "ssl_enable" => true, - "ssl_cert" => certificate[0].path, - "ssl_key" => certificate[1].path + "ssl_cert" => certificate_file.path, + "ssl_key" => key_file.path, + + # Trust our self-signed cert. + # TODO(sissel): Make this a separate certificate for the client + "ssl_extra_chain_certs" => certificate_file.path } end - let(:events) do - socket = Stud::try(5.times) do - ssl_context = OpenSSL::SSL::SSLContext.new - socket = TCPSocket.new("127.0.0.1", port) - OpenSSL::SSL::SSLSocket.new(socket, ssl_context) - end + subject(:input) { LogStash::Plugin.lookup("input", "tcp").new(config) } - result = helper.pipelineless_input(subject, nevents) do - socket.connect - nevents.times do |i| - socket.puts("msg #{i}") - socket.flush - end - end - socket.close rescue nil + before do + certificate_file.write(certificate) + key_file.write(key) - result + # Close to flush the file writes. + certificate_file.close + key_file.close + subject.register end - it "should receive events" do - expect(events.size).to be(nevents) + after do + File.unlink(certificate_file.path) + File.unlink(key_file.path) end - it "should not contain sslsubject" do - events.each do |event| - expect(event["sslsubject"]).to be_nil - end - end + context "with a poorly-behaving client" do + let!(:input_task) { Stud::Task.new { input.run(queue) } } - describe "when ssl_verify is on" do + after { input.close } - let(:chain_of_certificates) { helper.chain_of_certificates } + context "that disconnects before doing TLS handshake" do + before do + client = TCPSocket.new("127.0.0.1", port) + client.close + end - let(:ssl_context) do - ssl_context = OpenSSL::SSL::SSLContext.new - ssl_context.cert = OpenSSL::X509::Certificate.new(client_certificate) - ssl_context.key = OpenSSL::PKey::RSA.new(client_key) - ssl_context + it "should not negatively impact the plugin" do + # TODO(sissel): Look for a better way to detect this failure + # besides a sleep/wait. + result = input_task.thread.join(0.5) + expect(result).to be_nil + end end - context "and the verification fails" do + context "that sends garbage instead of TLS handshake" do + let!(:input_task) { Stud::Task.new { input.run(queue) } } + let(:max_length) { 1000 } + let(:garbage) { Flores::Random.iterations(max_length).collect { Flores::Random.integer(1...255) }.pack("C*") } + before do + # Assertion to verify this test is actually sending something. + expect(garbage.length).to be > 0 - let(:config) do - { - "host" => "0.0.0.0", - "port" => port, - "ssl_enable" => true, - "ssl_verify" => true, - "ssl_cert" => chain_of_certificates[:a_cert].path, - "ssl_key" => chain_of_certificates[:a_key].path - } - end - - let(:client_certificate) { File.read(chain_of_certificates[:b_cert].path) } - let(:client_key) { File.read(chain_of_certificates[:b_key].path) } - - let(:socket) do client = TCPSocket.new("127.0.0.1", port) - OpenSSL::SSL::SSLSocket.new(client, ssl_context) + client.write(garbage) + client.flush + Thread.new { sleep(1); client.close } end - - it "should raise an exception when connecting" do - helper.pipelineless_input(subject, 0) do - expect { socket.connect }.to raise_error - socket.close rescue nil - end + it "should not negatively impact the plugin" do + # TODO(sissel): Look for a better way to detect this failure besides a sleep/wait. + result = input_task.thread.join(0.5) + expect(result).to be_nil end end - context "and using the root CA" do + context "connection was healthy but now has garbage or corruption" do + let!(:input_task) { Stud::Task.new { input.run(queue) } } + let(:tcp) { TCPSocket.new("127.0.0.1", port) } + let(:sslcontext) { OpenSSL::SSL::SSLContext.new } + let(:sslsocket) { OpenSSL::SSL::SSLSocket.new(tcp, sslcontext) } + let(:max_length) { 1000 } + let(:garbage) { Flores::Random.iterations(max_length).collect { Flores::Random.integer(1...255) }.pack("C*") } - let(:config) do - { - "host" => "0.0.0.0", - "port" => port, - "ssl_enable" => true, - "ssl_verify" => true, - "ssl_cert" => chain_of_certificates[:a_cert].path, - "ssl_key" => chain_of_certificates[:a_key].path, - "ssl_cacert" => chain_of_certificates[:root_ca].path - } - end + before do + sslcontext.cert = certificate + sslcontext.key = key + sslcontext.verify_mode = OpenSSL::SSL::VERIFY_NONE - let(:client_certificate) { File.read(chain_of_certificates[:aa_cert].path) } - let(:client_key) { File.read(chain_of_certificates[:aa_key].path) } + sslsocket.connect + sslsocket.write("Hello world\n") - let(:events) do - socket = Stud::try(5.times) do - socket = TCPSocket.new("127.0.0.1", port) - OpenSSL::SSL::SSLSocket.new(socket, ssl_context) - end - - result = helper.pipelineless_input(subject, nevents) do - socket.connect - nevents.times do |i| - socket.puts("msg #{i}") - socket.flush - end - end - - socket.close rescue nil - - result + # Assertion to verify this test is actually sending something. + expect(garbage.length).to be > 0 + tcp.write(garbage) + tcp.flush + sslsocket.close + tcp.close end - it "should receive events" do - expect(events.size).to be(nevents) + it "should not negatively impact the plugin" do + # TODO(sissel): Look for a better way to detect this failure besides a sleep/wait. + result = input_task.thread.join(0.5) + expect(result).to be_nil end - - it "should contain sslsubject" do - events.each do |event| - expect(event["sslsubject"]).to eq("/DC=org/DC=ruby-lang/CN=RubyAA_Cert") - end - end end + end - context "using an extra chain of certificates" do + # TODO(sissel): Spec multiple clients where only one is bad. - let(:config) do - { - "host" => "0.0.0.0", - "port" => port, - "ssl_enable" => true, - "ssl_verify" => true, - "ssl_cert" => chain_of_certificates[:b_cert].path, - "ssl_key" => chain_of_certificates[:b_key].path, - "ssl_extra_chain_certs" => [ chain_of_certificates[:root_ca].path, chain_of_certificates[:a_cert].path, chain_of_certificates[:b_cert].path ] - } - end + context "with client certificate problems" do + context "using an expired certificate" + context "using an untrusted certificate" + end - let(:client_certificate) { File.read(chain_of_certificates[:c_cert].path) } - let(:client_key) { File.read(chain_of_certificates[:c_key].path) } - - let(:events) do - socket = Stud::try(5.times) do - socket = TCPSocket.new("127.0.0.1", port) - OpenSSL::SSL::SSLSocket.new(socket, ssl_context) - end - - result = helper.pipelineless_input(subject, nevents) do - socket.connect - nevents.times do |i| - socket.puts("msg #{i}") - socket.flush - end - end - - socket.close rescue nil - - result - end - - it "should receive events" do - expect(events.size).to be(nevents) - end - - it "should contain sslsubject" do - events.each do |event| - expect(event["sslsubject"]).to eq("/DC=org/DC=ruby-lang/CN=RubyC_Cert") - end - end - end + context "with a good connection" do + # TODO(sissel): use shared example + include_examples "receiving events" end + end end it_behaves_like "an interruptible input plugin" do let(:config) { { "port" => port } }