require "socket"
require "json"
require "time"
require "helper"
require "fluent/test/driver/input"
require "fluent/plugin/in_unix_client.rb"
require_relative "./unix_server.rb"
class UnixClientInputTest < Test::Unit::TestCase
def setup
Fluent::Test.setup
@server_thread = nil
end
def teardown
stop_server
FileUtils.rm_rf(TMP_DIR)
end
TMP_DIR = File.dirname(__FILE__) + "/../tmp/socket"
BASE_CONFIG = %[
@type unix_client
tag unix_client
path #{TMP_DIR}/socket.sock
]
DEFAULT_MSG = "Hello world."
def test_configure
d = create_driver(config_with_json_parser)
assert_equal "unix_client", d.instance.tag
assert_equal "#{TMP_DIR}/socket.sock", d.instance.path
assert_equal false, d.instance.format_json
end
def test_configure_with_format_json
d = create_driver(config_with_json_parser_and_format_json)
assert_equal "unix_client", d.instance.tag
assert_equal "#{TMP_DIR}/socket.sock", d.instance.path
assert_equal true, d.instance.format_json
end
def test_receive_json
d = create_driver(config_with_json_parser)
path = d.instance.path
start_server(path)
cur_time = Time.now.to_i
d.run(expect_records: 1, timeout: 10) do
sleep 1
send_json(path, time: cur_time)
end
assert_equal 1, d.events.length
d.events.each do |tag, time, record|
assert_equal "unix_client", tag
assert_equal cur_time, time
assert_equal DEFAULT_MSG, record["msg"]
end
end
def test_receive_json_with_custom_delimiter_newline
d = create_driver(config_with_json_parser_and_delimiter('\n'))
path = d.instance.path
start_server(path)
cur_time = Time.now.to_i
d.run(expect_records: 1, timeout: 10) do
sleep 1
send_json(path, time: cur_time, delimiter: "\n")
end
assert_equal 1, d.events.length
d.events.each do |tag, time, record|
assert_equal "unix_client", tag
assert_equal cur_time, time
assert_equal DEFAULT_MSG, record["msg"]
end
end
def test_receive_json_with_custom_delimiter_tab
d = create_driver(config_with_json_parser_and_delimiter('\t'))
path = d.instance.path
start_server(path)
cur_time = Time.now.to_i
d.run(expect_records: 1, timeout: 10) do
sleep 1
send_json(path, time: cur_time, delimiter: "\t")
end
assert_equal 1, d.events.length
d.events.each do |tag, time, record|
assert_equal "unix_client", tag
assert_equal cur_time, time
assert_equal DEFAULT_MSG, record["msg"]
end
end
def test_receive_json_list
d = create_driver(config_with_json_parser_and_format_json)
path = d.instance.path
delimiter = "\n"
msgs = ["hoge", "fuga", "foo"]
start_server(path)
d.run(expect_records: 3, timeout: 10) do
sleep 1
UNIXSocket.open(path) do |sock|
sock.write("[")
sock.write(JSON.generate(raw_data(msg: msgs[0])))
sock.write(",")
sock.write(delimiter)
sock.write(JSON.generate(raw_data(msg: msgs[1])))
sock.write(",")
sock.write(delimiter)
sock.write(JSON.generate(raw_data(msg: msgs[2])))
sock.write(delimiter)
sock.write("]")
sock.write(delimiter)
end
end
assert_equal 3, d.events.length
d.events.each_with_index do |event, i|
assert_equal msgs[i], event[2]["msg"]
end
end
def test_receive_json_list_with_one_delimiter
d = create_driver(config_with_json_parser_and_format_json)
path = d.instance.path
delimiter = "\n"
msgs = ["hoge", "fuga", "foo"]
start_server(path)
d.run(expect_records: 3, timeout: 10) do
sleep 1
data = msgs.map {|msg| raw_data(msg: msg)}
UNIXSocket.open(path) do |sock|
sock.write(JSON.generate(data))
sock.write(delimiter)
end
end
assert_equal 3, d.events.length
d.events.each_with_index do |event, i|
assert_equal msgs[i], event[2]["msg"]
end
end
private
def create_driver(conf)
Fluent::Test::Driver::Input.new(Fluent::Plugin::UnixClientInput).configure(conf)
end
def config_with_json_parser
BASE_CONFIG + %!
@type json
!
end
def config_with_json_parser_and_delimiter(delimiter)
BASE_CONFIG + %[
delimiter \"#{delimiter}\"
] + %!
@type json
!
end
def config_with_json_parser_and_format_json
BASE_CONFIG + %!
format_json true
@type json
!
end
def start_server(path)
@server_thread = Thread.new do
server = UnixBroadcastServer.new(path)
server.run
end
sleep 1
end
def stop_server
if @server_thread
@server_thread.kill if @server_thread
@server_thread.join
@server_thread = nil
end
end
def send_json(path, time: nil, msg: DEFAULT_MSG, delimiter: "\n")
msg = JSON.generate(raw_data(time: time, msg: msg))
UNIXSocket.open(path) do |sock|
sock.write(msg)
sock.write(delimiter)
end
end
def raw_data(time: nil, msg: DEFAULT_MSG)
{
"time" => time.nil? ? Time.now.to_i : time,
"msg" => msg
}
end
end