spec/inputs/tcp_spec.rb in logstash-input-tcp-0.1.3 vs spec/inputs/tcp_spec.rb in logstash-input-tcp-0.1.4

- old
+ new

@@ -5,276 +5,244 @@ require "logstash/json" require "logstash/inputs/tcp" require 'stud/try' describe LogStash::Inputs::Tcp do - + context "codec (PR #1372)" do it "switches from plain to line" do require "logstash/codecs/plain" require "logstash/codecs/line" plugin = LogStash::Inputs::Tcp.new("codec" => LogStash::Codecs::Plain.new, "port" => 0) plugin.register insist { plugin.codec }.is_a?(LogStash::Codecs::Line) + plugin.teardown end it "switches from json to json_lines" do require "logstash/codecs/json" require "logstash/codecs/json_lines" plugin = LogStash::Inputs::Tcp.new("codec" => LogStash::Codecs::JSON.new, "port" => 0) plugin.register insist { plugin.codec }.is_a?(LogStash::Codecs::JSONLines) + plugin.teardown end end - describe "read plain with unicode", :socket => true do + it "should read plain with unicode" do event_count = 10 port = 5511 - config <<-CONFIG + conf = <<-CONFIG input { tcp { port => #{port} } } CONFIG - input do |pipeline, queue| - Thread.new { pipeline.run } - sleep 0.1 while !pipeline.ready? + + events = input(conf) do |pipeline, queue| socket = Stud::try(5.times) { TCPSocket.new("127.0.0.1", port) } event_count.times do |i| # unicode smiley for testing unicode support! socket.puts("#{i} ☹") + socket.flush end socket.close - # wait till all events have been processed - Timeout.timeout(1) {sleep 0.1 while queue.size < event_count} + event_count.times.collect {queue.pop} + end - events = event_count.times.collect { queue.pop } - event_count.times do |i| - insist { events[i]["message"] } == "#{i} ☹" - end - end # input + insist { events.length } == event_count + event_count.times do |i| + insist { events[i]["message"] } == "#{i} ☹" + end end - describe "read events with plain codec and ISO-8859-1 charset" do + it "should read events with plain codec and ISO-8859-1 charset" do port = 5513 charset = "ISO-8859-1" - config <<-CONFIG + conf = <<-CONFIG input { tcp { port => #{port} codec => plain { charset => "#{charset}" } } } CONFIG - input do |pipeline, queue| - Thread.new { pipeline.run } - sleep 0.1 while !pipeline.ready? - + event = input(conf) do |pipeline, queue| socket = Stud::try(5.times) { TCPSocket.new("127.0.0.1", port) } text = "\xA3" # the £ symbol in ISO-8859-1 aka Latin-1 text.force_encoding("ISO-8859-1") socket.puts(text) socket.close - # wait till all events have been processed - Timeout.timeout(1) {sleep 0.1 while queue.size < 1} + queue.pop + end - event = queue.pop - # Make sure the 0xA3 latin-1 code converts correctly to UTF-8. - pending("charset conv broken") do - insist { event["message"].size } == 1 - insist { event["message"].bytesize } == 2 - insist { event["message"] } == "£" - end - end # input + # Make sure the 0xA3 latin-1 code converts correctly to UTF-8. + insist { event["message"].size } == 1 + insist { event["message"].bytesize } == 2 + insist { event["message"] } == "£" end - describe "read events with json codec" do + it "should read events with json codec" do port = 5514 - config <<-CONFIG + conf = <<-CONFIG input { tcp { port => #{port} codec => json } } CONFIG - input do |pipeline, queue| - Thread.new { pipeline.run } - sleep 0.1 while !pipeline.ready? + data = { + "hello" => "world", + "foo" => [1,2,3], + "baz" => { "1" => "2" }, + "host" => "example host" + } - data = { - "hello" => "world", - "foo" => [1,2,3], - "baz" => { "1" => "2" }, - "host" => "example host" - } - + event = input(conf) do |pipeline, queue| socket = Stud::try(5.times) { TCPSocket.new("127.0.0.1", port) } socket.puts(LogStash::Json.dump(data)) socket.close - # wait till all events have been processed - Timeout.timeout(1) {sleep 0.1 while queue.size < 1} + queue.pop + end - event = queue.pop - insist { event["hello"] } == data["hello"] - insist { event["foo"].to_a } == data["foo"] # to_a to cast Java ArrayList produced by JrJackson - insist { event["baz"] } == data["baz"] + insist { event["hello"] } == data["hello"] + insist { event["foo"].to_a } == data["foo"] # to_a to cast Java ArrayList produced by JrJackson + insist { event["baz"] } == data["baz"] - # Make sure the tcp input, w/ json codec, uses the event's 'host' value, - # if present, instead of providing its own - insist { event["host"] } == data["host"] - end # input + # Make sure the tcp input, w/ json codec, uses the event's 'host' value, + # if present, instead of providing its own + insist { event["host"] } == data["host"] end - describe "read events with json codec (testing 'host' handling)" do + it "should read events with json codec (testing 'host' handling)" do port = 5514 - config <<-CONFIG + conf = <<-CONFIG input { tcp { port => #{port} codec => json } } CONFIG - input do |pipeline, queue| - Thread.new { pipeline.run } - sleep 0.1 while !pipeline.ready? + data = { + "hello" => "world" + } - data = { - "hello" => "world" - } - + event = input(conf) do |pipeline, queue| socket = Stud::try(5.times) { TCPSocket.new("127.0.0.1", port) } socket.puts(LogStash::Json.dump(data)) socket.close - # wait till all events have been processed - Timeout.timeout(1) {sleep 0.1 while queue.size < 1} + queue.pop + end - event = queue.pop - insist { event["hello"] } == data["hello"] - insist { event }.include?("host") - end # input + insist { event["hello"] } == data["hello"] + insist { event }.include?("host") end - describe "read events with json_lines codec" do + it "should read events with json_lines codec" do port = 5515 - config <<-CONFIG + conf = <<-CONFIG input { tcp { port => #{port} codec => json_lines } } CONFIG - input do |pipeline, queue| - Thread.new { pipeline.run } - sleep 0.1 while !pipeline.ready? + data = { + "hello" => "world", + "foo" => [1,2,3], + "baz" => { "1" => "2" }, + "idx" => 0 + } - data = { - "hello" => "world", - "foo" => [1,2,3], - "baz" => { "1" => "2" }, - "idx" => 0 - } - + events = input(conf) do |pipeline, queue| socket = Stud::try(5.times) { TCPSocket.new("127.0.0.1", port) } (1..5).each do |idx| data["idx"] = idx socket.puts(LogStash::Json.dump(data) + "\n") - end # do + end socket.close - (1..5).each do |idx| - event = queue.pop - insist { event["hello"] } == data["hello"] - insist { event["foo"].to_a } == data["foo"] # to_a to cast Java ArrayList produced by JrJackson - insist { event["baz"] } == data["baz"] - insist { event["idx"] } == idx - end # do - end # input + (1..5).map{queue.pop} + end + + events.each_with_index do |event, idx| + insist { event["hello"] } == data["hello"] + insist { event["foo"].to_a } == data["foo"] # to_a to cast Java ArrayList produced by JrJackson + insist { event["baz"] } == data["baz"] + insist { event["idx"] } == idx + 1 + end # do end # describe - describe "one message per connection" do + it "should one message per connection" do event_count = 10 port = 5516 - config <<-CONFIG + conf = <<-CONFIG input { tcp { port => #{port} } } CONFIG - input do |pipeline, queue| - Thread.new { pipeline.run } - sleep 0.1 while !pipeline.ready? - + events = input(conf) do |pipeline, queue| event_count.times do |i| socket = Stud::try(5.times) { TCPSocket.new("127.0.0.1", port) } socket.puts("#{i}") socket.flush socket.close end - - # wait till all events have been processed - Timeout.timeout(1) {sleep 0.1 while queue.size < event_count} # since each message is sent on its own tcp connection & thread, exact receiving order cannot be garanteed - events = event_count.times.collect{queue.pop}.sort_by{|event| event["message"]} + event_count.times.collect{queue.pop}.sort_by{|event| event["message"]} + end - event_count.times do |i| - insist { events[i]["message"] } == "#{i}" - end - end # input + event_count.times do |i| + insist { events[i]["message"] } == "#{i}" + end end - describe "connection threads are cleaned up when connection is closed" do + it "should connection threads are cleaned up when connection is closed" do event_count = 10 port = 5517 - config <<-CONFIG + conf = <<-CONFIG input { tcp { port => #{port} } } CONFIG - input do |pipeline, queue| - Thread.new { pipeline.run } - sleep 0.1 while !pipeline.ready? - + input(conf) do |pipeline, queue| inputs = pipeline.instance_variable_get("@inputs") insist { inputs.size } == 1 sockets = event_count.times.map do |i| socket = Stud::try(5.times) { TCPSocket.new("127.0.0.1", port) } socket.puts("#{i}") socket.flush socket end - # wait till all events have been processed - Timeout.timeout(1) {sleep 0.1 while queue.size < event_count} - - # we should have "event_count" pending threads since sockets were not closed yet client_threads = inputs[0].instance_variable_get("@client_threads") - insist { client_threads.size } == event_count # close all sockets and make sure there is not more pending threads sockets.each{|socket| socket.close} + Timeout.timeout(1) {sleep 0.1 while client_threads.size > 0} insist { client_threads.size } == 0 # this check is actually useless per previous line - - end # input + end end end