require_relative 'helper' require 'fluent/event_router' require 'fluent/system_config' require 'fluent/supervisor' require_relative 'test_plugin_classes' require 'net/http' require 'uri' require 'fileutils' require 'tempfile' if Fluent.windows? require 'win32/event' end class SupervisorTest < ::Test::Unit::TestCase class DummyServer include Fluent::ServerModule attr_accessor :rpc_endpoint, :enable_get_dump def config {} end end TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/tmp/supervisor#{ENV['TEST_ENV_NUMBER']}") TMP_ROOT_DIR = File.join(TMP_DIR, 'root') def setup FileUtils.rm_rf(TMP_DIR) FileUtils.mkdir_p(TMP_DIR) end def write_config(path, data) FileUtils.mkdir_p(File.dirname(path)) File.open(path, "w") {|f| f.write data } end def test_system_config opts = Fluent::Supervisor.default_options sv = Fluent::Supervisor.new(opts) conf_data = <<-EOC rpc_endpoint 127.0.0.1:24445 suppress_repeated_stacktrace true suppress_config_dump true without_source true enable_get_dump true process_name "process_name" log_level info root_dir #{TMP_ROOT_DIR} format json time_format %Y bind 127.0.0.1 port 24321 scope server1 backup_path /tmp/backup host 127.0.0.1 port 24321 timeout 2 EOC conf = Fluent::Config.parse(conf_data, "(test)", "(test_dir)", true) sys_conf = sv.__send__(:build_system_config, conf) assert_equal '127.0.0.1:24445', sys_conf.rpc_endpoint assert_equal true, sys_conf.suppress_repeated_stacktrace assert_equal true, sys_conf.suppress_config_dump assert_equal true, sys_conf.without_source assert_equal true, sys_conf.enable_get_dump assert_equal "process_name", sys_conf.process_name assert_equal 2, sys_conf.log_level assert_equal TMP_ROOT_DIR, sys_conf.root_dir assert_equal :json, sys_conf.log.format assert_equal '%Y', sys_conf.log.time_format counter_server = sys_conf.counter_server assert_equal '127.0.0.1', counter_server.bind assert_equal 24321, counter_server.port assert_equal 'server1', counter_server.scope assert_equal '/tmp/backup', counter_server.backup_path counter_client = sys_conf.counter_client assert_equal '127.0.0.1', counter_client.host assert_equal 24321, counter_client.port assert_equal 2, counter_client.timeout end sub_test_case "yaml config" do def parse_yaml(yaml) context = Kernel.binding config = nil Tempfile.open do |file| file.puts(yaml) file.flush s = Fluent::Config::YamlParser::Loader.new(context).load(Pathname.new(file)) config = Fluent::Config::YamlParser::Parser.new(s).build.to_element end config end def test_system_config opts = Fluent::Supervisor.default_options sv = Fluent::Supervisor.new(opts) conf_data = <<-EOC system: rpc_endpoint: 127.0.0.1:24445 suppress_repeated_stacktrace: true suppress_config_dump: true without_source: true enable_get_dump: true process_name: "process_name" log_level: info root_dir: !fluent/s "#{TMP_ROOT_DIR}" log: format: json time_format: "%Y" counter_server: bind: 127.0.0.1 port: 24321 scope: server1 backup_path: /tmp/backup counter_client: host: 127.0.0.1 port: 24321 timeout: 2 EOC conf = parse_yaml(conf_data) sys_conf = sv.__send__(:build_system_config, conf) counter_client = sys_conf.counter_client counter_server = sys_conf.counter_server assert_equal( [ '127.0.0.1:24445', true, true, true, true, "process_name", 2, TMP_ROOT_DIR, :json, '%Y', '127.0.0.1', 24321, 'server1', '/tmp/backup', '127.0.0.1', 24321, 2, ], [ sys_conf.rpc_endpoint, sys_conf.suppress_repeated_stacktrace, sys_conf.suppress_config_dump, sys_conf.without_source, sys_conf.enable_get_dump, sys_conf.process_name, sys_conf.log_level, sys_conf.root_dir, sys_conf.log.format, sys_conf.log.time_format, counter_server.bind, counter_server.port, counter_server.scope, counter_server.backup_path, counter_client.host, counter_client.port, counter_client.timeout, ]) end end def test_main_process_signal_handlers omit "Windows cannot handle signals" if Fluent.windows? create_info_dummy_logger opts = Fluent::Supervisor.default_options sv = Fluent::Supervisor.new(opts) sv.send(:install_main_process_signal_handlers) begin Process.kill :USR1, $$ rescue end sleep 1 info_msg = '[info]: force flushing buffered events' + "\n" assert{ $log.out.logs.first.end_with?(info_msg) } ensure $log.out.reset if $log && $log.out && $log.out.respond_to?(:reset) end def test_main_process_command_handlers omit "Only for Windows, alternative to UNIX signals" unless Fluent.windows? create_info_dummy_logger opts = Fluent::Supervisor.default_options sv = Fluent::Supervisor.new(opts) r, w = IO.pipe $stdin = r sv.send(:install_main_process_signal_handlers) begin w.write("GRACEFUL_RESTART\n") w.flush ensure $stdin = STDIN end sleep 1 info_msg = '[info]: force flushing buffered events' + "\n" assert{ $log.out.logs.first.end_with?(info_msg) } ensure $log.out.reset if $log && $log.out && $log.out.respond_to?(:reset) end def test_supervisor_signal_handler omit "Windows cannot handle signals" if Fluent.windows? create_debug_dummy_logger server = DummyServer.new server.install_supervisor_signal_handlers begin Process.kill :USR1, $$ rescue end sleep 1 debug_msg = '[debug]: fluentd supervisor process get SIGUSR1' logs = $log.out.logs assert{ logs.any?{|log| log.include?(debug_msg) } } ensure $log.out.reset if $log && $log.out && $log.out.respond_to?(:reset) end def test_windows_shutdown_event omit "Only for Windows platform" unless Fluent.windows? server = DummyServer.new def server.config {:signame => "TestFluentdEvent"} end mock(server).stop(true) stub(Process).kill.times(0) server.install_windows_event_handler begin sleep 0.1 # Wait for starting windows event thread event = Win32::Event.open("TestFluentdEvent") event.set event.close ensure server.stop_windows_event_thread end debug_msg = '[debug]: Got Win32 event "TestFluentdEvent"' logs = $log.out.logs assert{ logs.any?{|log| log.include?(debug_msg) } } ensure $log.out.reset if $log && $log.out && $log.out.respond_to?(:reset) end def test_supervisor_event_handler omit "Only for Windows, alternative to UNIX signals" unless Fluent.windows? create_debug_dummy_logger server = DummyServer.new def server.config {:signame => "TestFluentdEvent"} end server.install_windows_event_handler begin sleep 0.1 # Wait for starting windows event thread event = Win32::Event.open("TestFluentdEvent_USR1") event.set event.close ensure server.stop_windows_event_thread end debug_msg = '[debug]: Got Win32 event "TestFluentdEvent_USR1"' logs = $log.out.logs assert{ logs.any?{|log| log.include?(debug_msg) } } ensure $log.out.reset if $log && $log.out && $log.out.respond_to?(:reset) end data("Normal", {raw_path: "C:\\Windows\\Temp\\sigdump.log", expected: "C:\\Windows\\Temp\\sigdump-#{$$}.log"}) data("UNIX style", {raw_path: "/Windows/Temp/sigdump.log", expected: "/Windows/Temp/sigdump-#{$$}.log"}) data("No extension", {raw_path: "C:\\Windows\\Temp\\sigdump", expected: "C:\\Windows\\Temp\\sigdump-#{$$}"}) data("Multi-extension", {raw_path: "C:\\Windows\\Temp\\sig.dump.bk", expected: "C:\\Windows\\Temp\\sig.dump-#{$$}.bk"}) def test_fluentsigdump_get_path_with_pid(data) p data path = Fluent::FluentSigdump.get_path_with_pid(data[:raw_path]) assert_equal(data[:expected], path) end def test_supervisor_event_dump_windows omit "Only for Windows, alternative to UNIX signals" unless Fluent.windows? server = DummyServer.new def server.config {:signame => "TestFluentdEvent"} end server.install_windows_event_handler assert_rr do # Have to use mock because `Sigdump.dump` seems to be somehow incompatible with RR. # The `mock(server).restart(true) { nil }` line in `test_rpc_server_windows` cause the next error. # Failure: test_supervisor_event_dump_windows(SupervisorTest): # class() # Called 0 times. # Expected 1 times. # .../Ruby26-x64/lib/ruby/gems/2.6.0/gems/sigdump-0.2.4/lib/sigdump.rb:74:in `block in dump_object_count' # 73: ObjectSpace.each_object {|o| # 74: c = o.class <-- HERE! mock(Sigdump).dump(anything) begin sleep 0.1 # Wait for starting windows event thread event = Win32::Event.open("TestFluentdEvent_CONT") event.set event.close sleep 1.0 # Wait for dumping ensure server.stop_windows_event_thread end end end data(:ipv4 => ["0.0.0.0", "127.0.0.1", false], :ipv6 => ["[::]", "[::1]", true], :localhost_ipv4 => ["localhost", "127.0.0.1", false]) def test_rpc_server(data) omit "Windows cannot handle signals" if Fluent.windows? bindaddr, localhost, ipv6 = data omit "IPv6 is not supported on this environment" if ipv6 && !ipv6_enabled? create_info_dummy_logger opts = Fluent::Supervisor.default_options sv = Fluent::Supervisor.new(opts) conf_data = <<-EOC rpc_endpoint "#{bindaddr}:24447" EOC conf = Fluent::Config.parse(conf_data, "(test)", "(test_dir)", true) sys_conf = sv.__send__(:build_system_config, conf) server = DummyServer.new server.rpc_endpoint = sys_conf.rpc_endpoint server.enable_get_dump = sys_conf.enable_get_dump server.run_rpc_server sv.send(:install_main_process_signal_handlers) response = Net::HTTP.get(URI.parse("http://#{localhost}:24447/api/plugins.flushBuffers")) info_msg = '[info]: force flushing buffered events' + "\n" server.stop_rpc_server # In TravisCI with OSX(Xcode), it seems that can't use rpc server. # This test will be passed in such environment. pend unless $log.out.logs.first assert_equal('{"ok":true}', response) assert{ $log.out.logs.first.end_with?(info_msg) } ensure $log.out.reset if $log.out.is_a?(Fluent::Test::DummyLogDevice) end data(:no_port => ["127.0.0.1"], :invalid_addr => ["*:24447"]) def test_invalid_rpc_endpoint(data) endpoint = data[0] opts = Fluent::Supervisor.default_options sv = Fluent::Supervisor.new(opts) conf_data = <<-EOC rpc_endpoint "#{endpoint}" EOC conf = Fluent::Config.parse(conf_data, "(test)", "(test_dir)", true) sys_conf = sv.__send__(:build_system_config, conf) server = DummyServer.new server.rpc_endpoint = sys_conf.rpc_endpoint assert_raise(Fluent::ConfigError.new("Invalid rpc_endpoint: #{endpoint}")) do server.run_rpc_server end end data(:ipv4 => ["0.0.0.0", "127.0.0.1", false], :ipv6 => ["[::]", "[::1]", true], :localhost_ipv4 => ["localhost", "127.0.0.1", true]) def test_rpc_server_windows(data) omit "Only for windows platform" unless Fluent.windows? bindaddr, localhost, ipv6 = data omit "IPv6 is not supported on this environment" if ipv6 && !ipv6_enabled? create_info_dummy_logger opts = Fluent::Supervisor.default_options sv = Fluent::Supervisor.new(opts) conf_data = <<-EOC rpc_endpoint "#{bindaddr}:24447" EOC conf = Fluent::Config.parse(conf_data, "(test)", "(test_dir)", true) sys_conf = sv.__send__(:build_system_config, conf) server = DummyServer.new def server.config { :signame => "TestFluentdEvent", :worker_pid => 5963, } end server.rpc_endpoint = sys_conf.rpc_endpoint server.run_rpc_server mock(server).restart(true) { nil } response = Net::HTTP.get(URI.parse("http://#{localhost}:24447/api/plugins.flushBuffers")) server.stop_rpc_server assert_equal('{"ok":true}', response) end def test_load_config tmp_dir = "#{TMP_DIR}/dir/test_load_config.conf" conf_info_str = %[ log_level info ] conf_debug_str = %[ log_level debug ] now = Time.now Timecop.freeze(now) write_config tmp_dir, conf_info_str params = {} params['workers'] = 1 params['use_v1_config'] = true params['log_path'] = 'test/tmp/supervisor/log' params['suppress_repeated_stacktrace'] = true params['log_level'] = Fluent::Log::LEVEL_INFO params['conf_encoding'] = 'utf-8' load_config_proc = Proc.new { Fluent::Supervisor.load_config(tmp_dir, params) } # first call se_config = load_config_proc.call assert_equal Fluent::Log::LEVEL_INFO, se_config[:log_level] assert_equal true, se_config[:suppress_repeated_stacktrace] assert_equal 'spawn', se_config[:worker_type] assert_equal 1, se_config[:workers] assert_equal false, se_config[:log_stdin] assert_equal false, se_config[:log_stdout] assert_equal false, se_config[:log_stderr] assert_equal true, se_config[:enable_heartbeat] assert_equal false, se_config[:auto_heartbeat] assert_equal false, se_config[:daemonize] assert_nil se_config[:pid_path] # second call immediately(reuse config) se_config = load_config_proc.call pre_config_mtime = se_config[:windows_daemon_cmdline][5]['pre_config_mtime'] pre_loadtime = se_config[:windows_daemon_cmdline][5]['pre_loadtime'] assert_nil pre_config_mtime assert_nil pre_loadtime Timecop.freeze(now + 5) # third call after 5 seconds(don't reuse config) se_config = load_config_proc.call pre_config_mtime = se_config[:windows_daemon_cmdline][5]['pre_config_mtime'] pre_loadtime = se_config[:windows_daemon_cmdline][5]['pre_loadtime'] assert_not_nil pre_config_mtime assert_not_nil pre_loadtime # forth call immediately(reuse config) se_config = load_config_proc.call # test that pre_config_mtime and pre_loadtime are not changed from previous one because reused pre_config assert_equal pre_config_mtime, se_config[:windows_daemon_cmdline][5]['pre_config_mtime'] assert_equal pre_loadtime, se_config[:windows_daemon_cmdline][5]['pre_loadtime'] write_config tmp_dir, conf_debug_str # fifth call after changed conf file(don't reuse config) se_config = load_config_proc.call assert_equal Fluent::Log::LEVEL_INFO, se_config[:log_level] ensure Timecop.return end def test_load_config_for_logger tmp_dir = "#{TMP_DIR}/dir/test_load_config_log.conf" conf_info_str = %[ format json time_format %FT%T.%L%z ] write_config tmp_dir, conf_info_str params = { 'use_v1_config' => true, 'conf_encoding' => 'utf8', 'log_level' => Fluent::Log::LEVEL_INFO, 'log_path' => 'test/tmp/supervisor/log', 'workers' => 1, 'log_format' => :json, 'log_time_format' => '%FT%T.%L%z', } r = Fluent::Supervisor.load_config(tmp_dir, params) assert_equal :json, r[:logger].format assert_equal '%FT%T.%L%z', r[:logger].time_format end def test_load_config_for_daemonize tmp_dir = "#{TMP_DIR}/dir/test_load_config.conf" conf_info_str = %[ log_level info ] conf_debug_str = %[ log_level debug ] now = Time.now Timecop.freeze(now) write_config tmp_dir, conf_info_str params = {} params['workers'] = 1 params['use_v1_config'] = true params['log_path'] = 'test/tmp/supervisor/log' params['suppress_repeated_stacktrace'] = true params['log_level'] = Fluent::Log::LEVEL_INFO params['daemonize'] = './fluentd.pid' params['conf_encoding'] = 'utf-8' load_config_proc = Proc.new { Fluent::Supervisor.load_config(tmp_dir, params) } # first call se_config = load_config_proc.call assert_equal Fluent::Log::LEVEL_INFO, se_config[:log_level] assert_equal true, se_config[:suppress_repeated_stacktrace] assert_equal 'spawn', se_config[:worker_type] assert_equal 1, se_config[:workers] assert_equal false, se_config[:log_stdin] assert_equal false, se_config[:log_stdout] assert_equal false, se_config[:log_stderr] assert_equal true, se_config[:enable_heartbeat] assert_equal false, se_config[:auto_heartbeat] assert_equal true, se_config[:daemonize] assert_equal './fluentd.pid', se_config[:pid_path] # second call immediately(reuse config) se_config = load_config_proc.call pre_config_mtime = se_config[:windows_daemon_cmdline][5]['pre_config_mtime'] pre_loadtime = se_config[:windows_daemon_cmdline][5]['pre_loadtime'] assert_nil pre_config_mtime assert_nil pre_loadtime Timecop.freeze(now + 5) # third call after 6 seconds(don't reuse config) se_config = load_config_proc.call pre_config_mtime = se_config[:windows_daemon_cmdline][5]['pre_config_mtime'] pre_loadtime = se_config[:windows_daemon_cmdline][5]['pre_loadtime'] assert_not_nil pre_config_mtime assert_not_nil pre_loadtime # forth call immediately(reuse config) se_config = load_config_proc.call # test that pre_config_mtime and pre_loadtime are not changed from previous one because reused pre_config assert_equal pre_config_mtime, se_config[:windows_daemon_cmdline][5]['pre_config_mtime'] assert_equal pre_loadtime, se_config[:windows_daemon_cmdline][5]['pre_loadtime'] write_config tmp_dir, conf_debug_str # fifth call after changed conf file(don't reuse config) se_config = load_config_proc.call assert_equal Fluent::Log::LEVEL_INFO, se_config[:log_level] ensure Timecop.return end def test_logger opts = Fluent::Supervisor.default_options sv = Fluent::Supervisor.new(opts) log = sv.instance_variable_get(:@log) log.init(:standalone, 0) logger = $log.instance_variable_get(:@logger) assert_equal Fluent::Log::LEVEL_INFO, $log.level # test that DamonLogger#level= overwrites Fluent.log#level logger.level = 'debug' assert_equal Fluent::Log::LEVEL_DEBUG, $log.level assert_equal 5, logger.instance_variable_get(:@rotate_age) assert_equal 1048576, logger.instance_variable_get(:@rotate_size) end data( daily_age: 'daily', weekly_age: 'weekly', monthly_age: 'monthly', integer_age: 2, ) def test_logger_with_rotate_age_and_rotate_size(rotate_age) opts = Fluent::Supervisor.default_options.merge( log_path: "#{TMP_DIR}/test", log_rotate_age: rotate_age, log_rotate_size: 10 ) sv = Fluent::Supervisor.new(opts) log = sv.instance_variable_get(:@log) log.init(:standalone, 0) assert_equal Fluent::LogDeviceIO, $log.out.class assert_equal rotate_age, $log.out.instance_variable_get(:@shift_age) assert_equal 10, $log.out.instance_variable_get(:@shift_size) end sub_test_case "system log rotation" do def parse_text(text) basepath = File.expand_path(File.dirname(__FILE__) + '/../../') Fluent::Config.parse(text, '(test)', basepath, true).elements.find { |e| e.name == 'system' } end def test_override_default_log_rotate Tempfile.open do |file| config = parse_text(<<-EOS) rotate_age 3 rotate_size 300 EOS file.puts(config) file.flush opts = Fluent::Supervisor.default_options.merge( log_path: "#{TMP_DIR}/test.log", config_path: file.path ) sv = Fluent::Supervisor.new(opts) log = sv.instance_variable_get(:@log) log.init(:standalone, 0) logger = $log.instance_variable_get(:@logger) assert_equal([3, 300], [logger.instance_variable_get(:@rotate_age), logger.instance_variable_get(:@rotate_size)]) end end def test_override_default_log_rotate_with_yaml_config Tempfile.open do |file| config = <<-EOS system: log: rotate_age: 3 rotate_size: 300 EOS file.puts(config) file.flush opts = Fluent::Supervisor.default_options.merge( log_path: "#{TMP_DIR}/test.log", config_path: file.path, config_file_type: :yaml, ) sv = Fluent::Supervisor.new(opts) log = sv.instance_variable_get(:@log) log.init(:standalone, 0) logger = $log.instance_variable_get(:@logger) assert_equal([3, 300], [logger.instance_variable_get(:@rotate_age), logger.instance_variable_get(:@rotate_size)]) end end end def test_inline_config omit 'this feature is deprecated. see https://github.com/fluent/fluentd/issues/2711' opts = Fluent::Supervisor.default_options opts[:inline_config] = '-' sv = Fluent::Supervisor.new(opts) assert_equal '-', sv.instance_variable_get(:@inline_config) inline_config = '\n@type stdout\n' stub(STDIN).read { inline_config } stub(Fluent::Config).build # to skip stub(sv).build_system_config { Fluent::SystemConfig.new } # to skip sv.configure assert_equal inline_config, sv.instance_variable_get(:@inline_config) end def test_log_level_affects opts = Fluent::Supervisor.default_options sv = Fluent::Supervisor.new(opts) c = Fluent::Config::Element.new('system', '', { 'log_level' => 'error' }, []) stub(Fluent::Config).build { config_element('ROOT', '', {}, [c]) } sv.configure assert_equal Fluent::Log::LEVEL_ERROR, $log.level end def test_enable_shared_socket server = DummyServer.new begin ENV.delete('SERVERENGINE_SOCKETMANAGER_PATH') server.before_run sleep 0.1 if Fluent.windows? # Wait for starting windows event thread assert_not_nil(ENV['SERVERENGINE_SOCKETMANAGER_PATH']) ensure server.after_run ENV.delete('SERVERENGINE_SOCKETMANAGER_PATH') end end def test_disable_shared_socket server = DummyServer.new def server.config { :disable_shared_socket => true, } end begin ENV.delete('SERVERENGINE_SOCKETMANAGER_PATH') server.before_run sleep 0.1 if Fluent.windows? # Wait for starting windows event thread assert_nil(ENV['SERVERENGINE_SOCKETMANAGER_PATH']) ensure server.after_run ENV.delete('SERVERENGINE_SOCKETMANAGER_PATH') end end def create_debug_dummy_logger dl_opts = {} dl_opts[:log_level] = ServerEngine::DaemonLogger::DEBUG logdev = Fluent::Test::DummyLogDevice.new logger = ServerEngine::DaemonLogger.new(logdev, dl_opts) $log = Fluent::Log.new(logger) end def create_info_dummy_logger dl_opts = {} dl_opts[:log_level] = ServerEngine::DaemonLogger::INFO logdev = Fluent::Test::DummyLogDevice.new logger = ServerEngine::DaemonLogger.new(logdev, dl_opts) $log = Fluent::Log.new(logger) end end