require_relative 'helper' require 'fluent/event_router' require 'fluent/system_config' require 'fluent/supervisor' require_relative 'test_plugin_classes' require 'net/http' require 'uri' require 'fileutils' class SupervisorTest < ::Test::Unit::TestCase class DummyServer include Fluent::ServerModule attr_accessor :rpc_endpoint, :enable_get_dump def config {} end end TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/tmp/supervisor#{ENV['TEST_ENV_NUMBER']}") TMP_ROOT_DIR = File.join(TMP_DIR, 'root') def setup FileUtils.rm_rf(TMP_DIR) FileUtils.mkdir_p(TMP_DIR) end def write_config(path, data) FileUtils.mkdir_p(File.dirname(path)) File.open(path, "w") {|f| f.write data } end def test_system_config opts = Fluent::Supervisor.default_options sv = Fluent::Supervisor.new(opts) conf_data = <<-EOC rpc_endpoint 127.0.0.1:24445 suppress_repeated_stacktrace true suppress_config_dump true without_source true enable_get_dump true process_name "process_name" log_level info root_dir #{TMP_ROOT_DIR} format json time_format %Y bind 127.0.0.1 port 24321 scope server1 backup_path /tmp/backup host 127.0.0.1 port 24321 timeout 2 EOC conf = Fluent::Config.parse(conf_data, "(test)", "(test_dir)", true) sys_conf = sv.__send__(:build_system_config, conf) assert_equal '127.0.0.1:24445', sys_conf.rpc_endpoint assert_equal true, sys_conf.suppress_repeated_stacktrace assert_equal true, sys_conf.suppress_config_dump assert_equal true, sys_conf.without_source assert_equal true, sys_conf.enable_get_dump assert_equal "process_name", sys_conf.process_name assert_equal 2, sys_conf.log_level assert_equal TMP_ROOT_DIR, sys_conf.root_dir assert_equal :json, sys_conf.log.format assert_equal '%Y', sys_conf.log.time_format counter_server = sys_conf.counter_server assert_equal '127.0.0.1', counter_server.bind assert_equal 24321, counter_server.port assert_equal 'server1', counter_server.scope assert_equal '/tmp/backup', counter_server.backup_path counter_client = sys_conf.counter_client assert_equal '127.0.0.1', counter_client.host assert_equal 24321, counter_client.port assert_equal 2, counter_client.timeout end def test_main_process_signal_handlers omit "Windows cannot handle signals" if Fluent.windows? create_info_dummy_logger opts = Fluent::Supervisor.default_options sv = Fluent::Supervisor.new(opts) sv.send(:install_main_process_signal_handlers) begin Process.kill :USR1, $$ rescue end sleep 1 info_msg = '[info]: force flushing buffered events' + "\n" assert{ $log.out.logs.first.end_with?(info_msg) } ensure $log.out.reset if $log && $log.out && $log.out.respond_to?(:reset) end def test_supervisor_signal_handler omit "Windows cannot handle signals" if Fluent.windows? create_debug_dummy_logger server = DummyServer.new server.install_supervisor_signal_handlers begin Process.kill :USR1, $$ rescue end sleep 1 debug_msg = '[debug]: fluentd supervisor process get SIGUSR1' logs = $log.out.logs assert{ logs.any?{|log| log.include?(debug_msg) } } ensure $log.out.reset if $log && $log.out && $log.out.respond_to?(:reset) end def test_rpc_server omit "Windows cannot handle signals" if Fluent.windows? create_info_dummy_logger opts = Fluent::Supervisor.default_options sv = Fluent::Supervisor.new(opts) conf_data = <<-EOC rpc_endpoint 0.0.0.0:24447 EOC conf = Fluent::Config.parse(conf_data, "(test)", "(test_dir)", true) sys_conf = sv.__send__(:build_system_config, conf) server = DummyServer.new server.rpc_endpoint = sys_conf.rpc_endpoint server.enable_get_dump = sys_conf.enable_get_dump server.run_rpc_server sv.send(:install_main_process_signal_handlers) Net::HTTP.get URI.parse('http://0.0.0.0:24447/api/plugins.flushBuffers') info_msg = '[info]: force flushing buffered events' + "\n" server.stop_rpc_server # In TravisCI with OSX(Xcode), it seems that can't use rpc server. # This test will be passed in such environment. pend unless $log.out.logs.first assert{ $log.out.logs.first.end_with?(info_msg) } ensure $log.out.reset end def test_load_config tmp_dir = "#{TMP_DIR}/dir/test_load_config.conf" conf_info_str = %[ log_level info ] conf_debug_str = %[ log_level debug ] now = Time.now Timecop.freeze(now) write_config tmp_dir, conf_info_str params = {} params['workers'] = 1 params['use_v1_config'] = true params['log_path'] = 'test/tmp/supervisor/log' params['suppress_repeated_stacktrace'] = true params['log_level'] = Fluent::Log::LEVEL_INFO params['conf_encoding'] = 'utf-8' load_config_proc = Proc.new { Fluent::Supervisor.load_config(tmp_dir, params) } # first call se_config = load_config_proc.call assert_equal Fluent::Log::LEVEL_INFO, se_config[:log_level] assert_equal true, se_config[:suppress_repeated_stacktrace] assert_equal 'spawn', se_config[:worker_type] assert_equal 1, se_config[:workers] assert_equal false, se_config[:log_stdin] assert_equal false, se_config[:log_stdout] assert_equal false, se_config[:log_stderr] assert_equal true, se_config[:enable_heartbeat] assert_equal false, se_config[:auto_heartbeat] assert_equal false, se_config[:daemonize] assert_nil se_config[:pid_path] # second call immediately(reuse config) se_config = load_config_proc.call pre_config_mtime = se_config[:windows_daemon_cmdline][5]['pre_config_mtime'] pre_loadtime = se_config[:windows_daemon_cmdline][5]['pre_loadtime'] assert_nil pre_config_mtime assert_nil pre_loadtime Timecop.freeze(now + 5) # third call after 5 seconds(don't reuse config) se_config = load_config_proc.call pre_config_mtime = se_config[:windows_daemon_cmdline][5]['pre_config_mtime'] pre_loadtime = se_config[:windows_daemon_cmdline][5]['pre_loadtime'] assert_not_nil pre_config_mtime assert_not_nil pre_loadtime # forth call immediately(reuse config) se_config = load_config_proc.call # test that pre_config_mtime and pre_loadtime are not changed from previous one because reused pre_config assert_equal pre_config_mtime, se_config[:windows_daemon_cmdline][5]['pre_config_mtime'] assert_equal pre_loadtime, se_config[:windows_daemon_cmdline][5]['pre_loadtime'] write_config tmp_dir, conf_debug_str # fifth call after changed conf file(don't reuse config) se_config = load_config_proc.call assert_equal Fluent::Log::LEVEL_INFO, se_config[:log_level] ensure Timecop.return end def test_load_config_for_logger tmp_dir = "#{TMP_DIR}/dir/test_load_config_log.conf" conf_info_str = %[ format json time_format %FT%T.%L%z ] write_config tmp_dir, conf_info_str params = { 'use_v1_config' => true, 'conf_encoding' => 'utf8', 'log_level' => Fluent::Log::LEVEL_INFO, 'log_path' => 'test/tmp/supervisor/log', 'workers' => 1, 'log_format' => :json, 'log_time_format' => '%FT%T.%L%z', } r = Fluent::Supervisor.load_config(tmp_dir, params) assert_equal :json, r[:logger].format assert_equal '%FT%T.%L%z', r[:logger].time_format end def test_load_config_for_daemonize tmp_dir = "#{TMP_DIR}/dir/test_load_config.conf" conf_info_str = %[ log_level info ] conf_debug_str = %[ log_level debug ] now = Time.now Timecop.freeze(now) write_config tmp_dir, conf_info_str params = {} params['workers'] = 1 params['use_v1_config'] = true params['log_path'] = 'test/tmp/supervisor/log' params['suppress_repeated_stacktrace'] = true params['log_level'] = Fluent::Log::LEVEL_INFO params['daemonize'] = './fluentd.pid' params['conf_encoding'] = 'utf-8' load_config_proc = Proc.new { Fluent::Supervisor.load_config(tmp_dir, params) } # first call se_config = load_config_proc.call assert_equal Fluent::Log::LEVEL_INFO, se_config[:log_level] assert_equal true, se_config[:suppress_repeated_stacktrace] assert_equal 'spawn', se_config[:worker_type] assert_equal 1, se_config[:workers] assert_equal false, se_config[:log_stdin] assert_equal false, se_config[:log_stdout] assert_equal false, se_config[:log_stderr] assert_equal true, se_config[:enable_heartbeat] assert_equal false, se_config[:auto_heartbeat] assert_equal true, se_config[:daemonize] assert_equal './fluentd.pid', se_config[:pid_path] # second call immediately(reuse config) se_config = load_config_proc.call pre_config_mtime = se_config[:windows_daemon_cmdline][5]['pre_config_mtime'] pre_loadtime = se_config[:windows_daemon_cmdline][5]['pre_loadtime'] assert_nil pre_config_mtime assert_nil pre_loadtime Timecop.freeze(now + 5) # third call after 6 seconds(don't reuse config) se_config = load_config_proc.call pre_config_mtime = se_config[:windows_daemon_cmdline][5]['pre_config_mtime'] pre_loadtime = se_config[:windows_daemon_cmdline][5]['pre_loadtime'] assert_not_nil pre_config_mtime assert_not_nil pre_loadtime # forth call immediately(reuse config) se_config = load_config_proc.call # test that pre_config_mtime and pre_loadtime are not changed from previous one because reused pre_config assert_equal pre_config_mtime, se_config[:windows_daemon_cmdline][5]['pre_config_mtime'] assert_equal pre_loadtime, se_config[:windows_daemon_cmdline][5]['pre_loadtime'] write_config tmp_dir, conf_debug_str # fifth call after changed conf file(don't reuse config) se_config = load_config_proc.call assert_equal Fluent::Log::LEVEL_INFO, se_config[:log_level] ensure Timecop.return end def test_logger opts = Fluent::Supervisor.default_options sv = Fluent::Supervisor.new(opts) log = sv.instance_variable_get(:@log) log.init(:standalone, 0) logger = $log.instance_variable_get(:@logger) assert_equal Fluent::Log::LEVEL_INFO, $log.level # test that DamonLogger#level= overwrites Fluent.log#level logger.level = 'debug' assert_equal Fluent::Log::LEVEL_DEBUG, $log.level assert_equal 5, logger.instance_variable_get(:@rotate_age) assert_equal 1048576, logger.instance_variable_get(:@rotate_size) end data( daily_age: 'daily', weekly_age: 'weekly', monthly_age: 'monthly', integer_age: 2, ) def test_logger_with_rotate_age_and_rotate_size(rotate_age) opts = Fluent::Supervisor.default_options.merge( log_path: "#{TMP_DIR}/test", log_rotate_age: rotate_age, log_rotate_size: 10 ) sv = Fluent::Supervisor.new(opts) log = sv.instance_variable_get(:@log) log.init(:standalone, 0) assert_equal Fluent::LogDeviceIO, $log.out.class assert_equal rotate_age, $log.out.instance_variable_get(:@shift_age) assert_equal 10, $log.out.instance_variable_get(:@shift_size) end def test_inline_config omit 'this feature is deprecated. see https://github.com/fluent/fluentd/issues/2711' opts = Fluent::Supervisor.default_options opts[:inline_config] = '-' sv = Fluent::Supervisor.new(opts) assert_equal '-', sv.instance_variable_get(:@inline_config) inline_config = '\n@type stdout\n' stub(STDIN).read { inline_config } stub(Fluent::Config).build # to skip stub(sv).build_system_config { Fluent::SystemConfig.new } # to skip sv.configure assert_equal inline_config, sv.instance_variable_get(:@inline_config) end def test_log_level_affects opts = Fluent::Supervisor.default_options sv = Fluent::Supervisor.new(opts) c = Fluent::Config::Element.new('system', '', { 'log_level' => 'error' }, []) stub(Fluent::Config).build { config_element('ROOT', '', {}, [c]) } sv.configure assert_equal Fluent::Log::LEVEL_ERROR, $log.level end def create_debug_dummy_logger dl_opts = {} dl_opts[:log_level] = ServerEngine::DaemonLogger::DEBUG logdev = Fluent::Test::DummyLogDevice.new logger = ServerEngine::DaemonLogger.new(logdev, dl_opts) $log = Fluent::Log.new(logger) end def create_info_dummy_logger dl_opts = {} dl_opts[:log_level] = ServerEngine::DaemonLogger::INFO logdev = Fluent::Test::DummyLogDevice.new logger = ServerEngine::DaemonLogger.new(logdev, dl_opts) $log = Fluent::Log.new(logger) end end