require "socket" require "json" require "time" require "helper" require "fluent/test/driver/input" require "fluent/plugin/in_unix_client.rb" require_relative "./unix_server.rb" class UnixClientInputTest < Test::Unit::TestCase def setup Fluent::Test.setup @thread = nil end def teardown @thread.kill if @thread @thread = nil FileUtils.rm_rf(TMP_DIR) end TMP_DIR = File.dirname(__FILE__) + "/../tmp/socket" BASE_CONFIG = %[ @type unix_client tag unix_client path #{TMP_DIR}/socket.sock ] DEFAULT_MSG = "Hello world." def test_configure d = create_driver(config_with_json_parser) assert_equal "unix_client", d.instance.tag assert_equal "#{TMP_DIR}/socket.sock", d.instance.path end def test_receive_json d = create_driver(config_with_json_parser) path = d.instance.path start_server(path) cur_time = Time.now.to_i d.run(expect_records: 1, timeout: 10) do sleep 1 send_json(path, time: cur_time) end assert_equal 1, d.events.length d.events.each do |tag, time, record| assert_equal "unix_client", tag assert_equal cur_time, time assert_equal DEFAULT_MSG, record["msg"] end end def test_receive_json_with_custom_delimiter_newline d = create_driver(config_with_json_parser_and_delimiter('\n')) path = d.instance.path start_server(path) cur_time = Time.now.to_i d.run(expect_records: 1, timeout: 10) do sleep 1 send_json(path, time: cur_time, delimiter: "\n") end assert_equal 1, d.events.length d.events.each do |tag, time, record| assert_equal "unix_client", tag assert_equal cur_time, time assert_equal DEFAULT_MSG, record["msg"] end end def test_receive_json_with_custom_delimiter_tab d = create_driver(config_with_json_parser_and_delimiter('\t')) path = d.instance.path start_server(path) cur_time = Time.now.to_i d.run(expect_records: 1, timeout: 10) do sleep 1 send_json(path, time: cur_time, delimiter: "\t") end assert_equal 1, d.events.length d.events.each do |tag, time, record| assert_equal "unix_client", tag assert_equal cur_time, time assert_equal DEFAULT_MSG, record["msg"] end end private def create_driver(conf) Fluent::Test::Driver::Input.new(Fluent::Plugin::UnixClientInput).configure(conf) end def config_with_json_parser BASE_CONFIG + %! @type json ! end def config_with_json_parser_and_delimiter(delimiter) BASE_CONFIG + %[ delimiter \"#{delimiter}\" ] + %! @type json ! end def start_server(path) @thread = Thread.new do server = UnixBroadcastServer.new(path) server.run end sleep 1 end def send_json(path, time: nil, msg: DEFAULT_MSG, delimiter: "\n") msg = JSON.generate( { "time" => time.nil? ? Time.now.to_i : time, "msg" => msg } ) UNIXSocket.open(path) do |sock| sock.write(msg) sock.write(delimiter) end end end