require 'fluent/test' require 'helper' class ForwardInputTest < Test::Unit::TestCase def setup Fluent::Test.setup end PORT = unused_port CONFIG = %[ port #{PORT} bind 127.0.0.1 ] def create_driver(conf=CONFIG) Fluent::Test::InputTestDriver.new(Fluent::ForwardInput).configure(conf) end def test_configure d = create_driver assert_equal PORT, d.instance.port assert_equal '127.0.0.1', d.instance.bind assert_equal 0, d.instance.linger_timeout assert_equal 0.5, d.instance.blocking_timeout assert !d.instance.backlog end # TODO: Will add Loop::run arity check with stub/mock library def connect TCPSocket.new('127.0.0.1', PORT) end def test_time d = create_driver time = Time.parse("2011-01-02 13:14:15 UTC").to_i Fluent::Engine.now = time d.expect_emit "tag1", time, {"a"=>1} d.expect_emit "tag2", time, {"a"=>2} d.run do d.expected_emits.each {|tag,time,record| send_data [tag, 0, record].to_msgpack } sleep 0.5 end end def test_message d = create_driver time = Time.parse("2011-01-02 13:14:15 UTC").to_i d.expect_emit "tag1", time, {"a"=>1} d.expect_emit "tag2", time, {"a"=>2} d.run do d.expected_emits.each {|tag,time,record| send_data [tag, time, record].to_msgpack } sleep 0.5 end end def test_forward d = create_driver time = Time.parse("2011-01-02 13:14:15 UTC").to_i d.expect_emit "tag1", time, {"a"=>1} d.expect_emit "tag1", time, {"a"=>2} d.run do entries = [] d.expected_emits.each {|tag,time,record| entries << [time, record] } send_data ["tag1", entries].to_msgpack sleep 0.5 end end def test_packed_forward d = create_driver time = Time.parse("2011-01-02 13:14:15 UTC").to_i d.expect_emit "tag1", time, {"a"=>1} d.expect_emit "tag1", time, {"a"=>2} d.run do entries = '' d.expected_emits.each {|tag,time,record| [time, record].to_msgpack(entries) } send_data ["tag1", entries].to_msgpack sleep 0.5 end end def test_message_json d = create_driver time = Time.parse("2011-01-02 13:14:15 UTC").to_i d.expect_emit "tag1", time, {"a"=>1} d.expect_emit "tag2", time, {"a"=>2} d.run do d.expected_emits.each {|tag,time,record| send_data [tag, time, record].to_json } sleep 0.5 end end def test_send_large_chunk_warning d = create_driver(CONFIG + %[ chunk_size_warn_limit 16M chunk_size_limit 32M ]) time = Time.parse("2014-04-25 13:14:15 UTC").to_i # generate over 16M chunk str = "X" * 1024 * 1024 chunk = [ "test.tag", (0...16).map{|i| [time + i, {"data" => str}] } ].to_msgpack assert chunk.size > (16 * 1024 * 1024) assert chunk.size < (32 * 1024 * 1024) d.run do MessagePack::Unpacker.new.feed_each(chunk) do |obj| d.instance.send(:on_message, obj, chunk.size, "host: 127.0.0.1, addr: 127.0.0.1, port: 0000") end end # check emitted data emits = d.emits assert_equal 16, emits.size assert emits.map(&:first).all?{|t| t == "test.tag" } assert_equal (0...16).to_a, emits.map{|tag, t, record| t - time } # check log assert d.instance.log.logs.select{|line| line =~ / \[warn\]: Input chunk size is larger than 'chunk_size_warn_limit':/ && line =~ / tag="test.tag" source="host: 127.0.0.1, addr: 127.0.0.1, port: \d+" limit=16777216 size=16777501/ }.size == 1, "large chunk warning is not logged" end def test_send_large_chunk_only_warning d = create_driver(CONFIG + %[ chunk_size_warn_limit 16M ]) time = Time.parse("2014-04-25 13:14:15 UTC").to_i # generate over 16M chunk str = "X" * 1024 * 1024 chunk = [ "test.tag", (0...16).map{|i| [time + i, {"data" => str}] } ].to_msgpack d.run do MessagePack::Unpacker.new.feed_each(chunk) do |obj| d.instance.send(:on_message, obj, chunk.size, "host: 127.0.0.1, addr: 127.0.0.1, port: 0000") end end # check log assert d.instance.log.logs.select{ |line| line =~ / \[warn\]: Input chunk size is larger than 'chunk_size_warn_limit':/ && line =~ / tag="test.tag" source="host: 127.0.0.1, addr: 127.0.0.1, port: \d+" limit=16777216 size=16777501/ }.size == 1, "large chunk warning is not logged" end def test_send_large_chunk_limit d = create_driver(CONFIG + %[ chunk_size_warn_limit 16M chunk_size_limit 32M ]) time = Time.parse("2014-04-25 13:14:15 UTC").to_i # generate over 32M chunk str = "X" * 1024 * 1024 chunk = [ "test.tag", (0...32).map{|i| [time + i, {"data" => str}] } ].to_msgpack assert chunk.size > (32 * 1024 * 1024) # d.run => send_data d.run do MessagePack::Unpacker.new.feed_each(chunk) do |obj| d.instance.send(:on_message, obj, chunk.size, "host: 127.0.0.1, addr: 127.0.0.1, port: 0000") end end # check emitted data emits = d.emits assert_equal 0, emits.size # check log assert d.instance.log.logs.select{|line| line =~ / \[warn\]: Input chunk size is larger than 'chunk_size_limit', dropped:/ && line =~ / tag="test.tag" source="host: 127.0.0.1, addr: 127.0.0.1, port: \d+" limit=33554432 size=33554989/ }.size == 1, "large chunk warning is not logged" end def send_data(data) io = connect begin io.write data ensure io.close end end # TODO heartbeat end