spec/inputs/tcp_spec.rb in logstash-input-tcp-1.0.0 vs spec/inputs/tcp_spec.rb in logstash-input-tcp-2.0.1
- old
+ new
@@ -3,30 +3,30 @@
require "socket"
require "timeout"
require "logstash/json"
require "logstash/inputs/tcp"
require 'stud/try'
+require_relative "../spec_helper"
describe LogStash::Inputs::Tcp do
-
context "codec (PR #1372)" do
it "switches from plain to line" do
require "logstash/codecs/plain"
require "logstash/codecs/line"
plugin = LogStash::Inputs::Tcp.new("codec" => LogStash::Codecs::Plain.new, "port" => 0)
plugin.register
insist { plugin.codec }.is_a?(LogStash::Codecs::Line)
- plugin.teardown
+ plugin.close
end
it "switches from json to json_lines" do
require "logstash/codecs/json"
require "logstash/codecs/json_lines"
plugin = LogStash::Inputs::Tcp.new("codec" => LogStash::Codecs::JSON.new, "port" => 0)
plugin.register
insist { plugin.codec }.is_a?(LogStash::Codecs::JSONLines)
- plugin.teardown
+ plugin.close
end
end
it "should read plain with unicode" do
event_count = 10
@@ -37,12 +37,10 @@
port => #{port}
}
}
CONFIG
-
-
events = input(conf) do |pipeline, queue|
socket = Stud::try(5.times) { TCPSocket.new("127.0.0.1", port) }
event_count.times do |i|
# unicode smiley for testing unicode support!
socket.puts("#{i} ☹")
@@ -212,37 +210,69 @@
event_count.times do |i|
insist { events[i]["message"] } == "#{i}"
end
end
- it "should connection threads are cleaned up when connection is closed" do
- event_count = 10
- port = 5517
- conf = <<-CONFIG
- input {
- tcp {
- port => #{port}
- }
- }
- CONFIG
+ # below are new specs added in the context of the shutdown semantic refactor.
+ # TODO:
+ # - refactor all specs using this new model
+ # - pipelineless_input has been basically copied from the udp input specs, it should be DRYied up
+ # - see if we should miminc the udp input UDPClient helper class instead of directly using TCPSocket
- input(conf) do |pipeline, queue|
- inputs = pipeline.instance_variable_get("@inputs")
- insist { inputs.size } == 1
+ describe "LogStash::Inputs::Tcp new specs style" do
- sockets = event_count.times.map do |i|
+ before do
+ srand(RSpec.configuration.seed)
+ end
+
+ let(:port) { rand(1024..65535) }
+ subject { LogStash::Plugin.lookup("input", "tcp").new({ "port" => port }) }
+
+ after :each do
+ subject.close rescue nil
+ end
+
+ describe "register" do
+ it "should register without errors" do
+ expect { subject.register }.to_not raise_error
+ end
+ end
+
+ describe "receive" do
+
+ let(:nevents) { 10 }
+
+ let(:events) do
socket = Stud::try(5.times) { TCPSocket.new("127.0.0.1", port) }
- socket.puts("#{i}")
- socket.flush
- socket
+
+ result = pipelineless_input(subject, nevents) do
+ nevents.times do |i|
+ socket.puts("msg #{i}")
+ socket.flush
+ end
+ end
+
+ socket.close rescue nil
+
+ result
end
- client_threads = inputs[0].instance_variable_get("@client_threads")
+ before(:each) do
+ subject.register
+ end
- # close all sockets and make sure there is not more pending threads
- sockets.each{|socket| socket.close}
+ it "should receive events been generated" do
+ expect(events.size).to be(nevents)
+ messages = events.map { |event| event["message"]}
+ messages.each do |message|
+ expect(message).to match(/msg \d+/)
+ end
+ end
- Timeout.timeout(1) {sleep 0.1 while client_threads.size > 0}
- insist { client_threads.size } == 0 # this check is actually useless per previous line
end
+
+ it_behaves_like "an interruptible input plugin" do
+ let(:config) { { "port" => port } }
+ end
end
+
end