spec/inputs/tcp_spec.rb in logstash-input-tcp-1.0.0 vs spec/inputs/tcp_spec.rb in logstash-input-tcp-2.0.1

- old
+ new

@@ -3,30 +3,30 @@ require "socket" require "timeout" require "logstash/json" require "logstash/inputs/tcp" require 'stud/try' +require_relative "../spec_helper" describe LogStash::Inputs::Tcp do - context "codec (PR #1372)" do it "switches from plain to line" do require "logstash/codecs/plain" require "logstash/codecs/line" plugin = LogStash::Inputs::Tcp.new("codec" => LogStash::Codecs::Plain.new, "port" => 0) plugin.register insist { plugin.codec }.is_a?(LogStash::Codecs::Line) - plugin.teardown + plugin.close end it "switches from json to json_lines" do require "logstash/codecs/json" require "logstash/codecs/json_lines" plugin = LogStash::Inputs::Tcp.new("codec" => LogStash::Codecs::JSON.new, "port" => 0) plugin.register insist { plugin.codec }.is_a?(LogStash::Codecs::JSONLines) - plugin.teardown + plugin.close end end it "should read plain with unicode" do event_count = 10 @@ -37,12 +37,10 @@ port => #{port} } } CONFIG - - events = input(conf) do |pipeline, queue| socket = Stud::try(5.times) { TCPSocket.new("127.0.0.1", port) } event_count.times do |i| # unicode smiley for testing unicode support! socket.puts("#{i} ☹") @@ -212,37 +210,69 @@ event_count.times do |i| insist { events[i]["message"] } == "#{i}" end end - it "should connection threads are cleaned up when connection is closed" do - event_count = 10 - port = 5517 - conf = <<-CONFIG - input { - tcp { - port => #{port} - } - } - CONFIG + # below are new specs added in the context of the shutdown semantic refactor. + # TODO: + # - refactor all specs using this new model + # - pipelineless_input has been basically copied from the udp input specs, it should be DRYied up + # - see if we should miminc the udp input UDPClient helper class instead of directly using TCPSocket - input(conf) do |pipeline, queue| - inputs = pipeline.instance_variable_get("@inputs") - insist { inputs.size } == 1 + describe "LogStash::Inputs::Tcp new specs style" do - sockets = event_count.times.map do |i| + before do + srand(RSpec.configuration.seed) + end + + let(:port) { rand(1024..65535) } + subject { LogStash::Plugin.lookup("input", "tcp").new({ "port" => port }) } + + after :each do + subject.close rescue nil + end + + describe "register" do + it "should register without errors" do + expect { subject.register }.to_not raise_error + end + end + + describe "receive" do + + let(:nevents) { 10 } + + let(:events) do socket = Stud::try(5.times) { TCPSocket.new("127.0.0.1", port) } - socket.puts("#{i}") - socket.flush - socket + + result = pipelineless_input(subject, nevents) do + nevents.times do |i| + socket.puts("msg #{i}") + socket.flush + end + end + + socket.close rescue nil + + result end - client_threads = inputs[0].instance_variable_get("@client_threads") + before(:each) do + subject.register + end - # close all sockets and make sure there is not more pending threads - sockets.each{|socket| socket.close} + it "should receive events been generated" do + expect(events.size).to be(nevents) + messages = events.map { |event| event["message"]} + messages.each do |message| + expect(message).to match(/msg \d+/) + end + end - Timeout.timeout(1) {sleep 0.1 while client_threads.size > 0} - insist { client_threads.size } == 0 # this check is actually useless per previous line end + + it_behaves_like "an interruptible input plugin" do + let(:config) { { "port" => port } } + end end + end