require_relative '../helper' # require 'fluent/command/fluentd' # don't require it... it runs immediately require 'fileutils' require 'timeout' class TestFluentdCommand < ::Test::Unit::TestCase TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/../tmp/command/fluentd#{ENV['TEST_ENV_NUMBER']}") SUPERVISOR_PID_PATTERN = /starting fluentd-[.0-9]+ pid=(\d+)/ WORKER_PID_PATTERN = /starting fluentd worker pid=(\d+) / setup do FileUtils.rm_rf(TMP_DIR) FileUtils.mkdir_p(TMP_DIR) @supervisor_pid = nil @worker_pids = [] end def process_exist?(pid) begin r = Process.waitpid(pid, Process::WNOHANG) return true if r.nil? false rescue SystemCallError false end end def create_conf_file(name, content, ext_enc = 'utf-8') conf_path = File.join(TMP_DIR, name) File.open(conf_path, "w:#{ext_enc}:utf-8") do |file| file.write content end conf_path end def create_plugin_file(name, content) file_path = File.join(TMP_DIR, 'plugin', name) FileUtils.mkdir_p(File.dirname(file_path)) File.open(file_path, 'w') do |file| file.write content end file_path end def create_cmdline(conf_path, *fluentd_options) if Fluent.windows? cmd_path = File.expand_path(File.dirname(__FILE__) + "../../../bin/fluentd") ["bundle", "exec", ServerEngine.ruby_bin_path, cmd_path, "-c", conf_path, *fluentd_options] else cmd_path = File.expand_path(File.dirname(__FILE__) + "../../../bin/fluentd") ["bundle", "exec", cmd_path, "-c", conf_path, *fluentd_options] end end def execute_command(cmdline, chdir=TMP_DIR, env = {}) null_stream = File.open(File::NULL, 'w') gemfile_path = File.expand_path(File.dirname(__FILE__) + "../../../Gemfile") env = { "BUNDLE_GEMFILE" => gemfile_path }.merge(env) cmdname = cmdline.shift arg0 = "testing-fluentd" # p(here: "executing process", env: env, cmdname: cmdname, arg0: arg0, args: cmdline) IO.popen(env, [[cmdname, arg0], *cmdline], chdir: chdir, err: [:child, :out]) do |io| pid = io.pid begin yield pid, io # p(here: "execute command", pid: pid, worker_pids: @worker_pids) ensure Process.kill(:KILL, pid) rescue nil if @supervisor_pid Process.kill(:KILL, @supervisor_pid) rescue nil end @worker_pids.each do |cpid| Process.kill(:KILL, cpid) rescue nil end # p(here: "execute command", pid: pid, exist: process_exist?(pid), worker_pids: @worker_pids, exists: @worker_pids.map{|i| process_exist?(i) }) Timeout.timeout(10){ sleep 0.1 while process_exist?(pid) } end end ensure null_stream.close rescue nil end def eager_read(io) buf = +'' loop do b = io.read_nonblock(1024, nil, exception: false) if b == :wait_readable || b.nil? return buf end buf << b end end def assert_log_matches(cmdline, *pattern_list, patterns_not_match: [], timeout: 10, env: {}) matched = false assert_error_msg = "matched correctly" stdio_buf = "" begin execute_command(cmdline, TMP_DIR, env) do |pid, stdout| begin waiting(timeout) do while process_exist?(pid) && !matched readables, _, _ = IO.select([stdout], nil, nil, 1) next unless readables break if readables.first.eof? buf = eager_read(readables.first) # puts buf stdio_buf << buf lines = stdio_buf.split("\n") if pattern_list.all?{|ptn| lines.any?{|line| ptn.is_a?(Regexp) ? ptn.match(line) : line.include?(ptn) } } matched = true end end end ensure if SUPERVISOR_PID_PATTERN =~ stdio_buf @supervisor_pid = $1.to_i end stdio_buf.scan(WORKER_PID_PATTERN) do |worker_pid| @worker_pids << worker_pid.first.to_i end end end rescue Timeout::Error assert_error_msg = "execution timeout with command out:\n" + stdio_buf rescue => e assert_error_msg = "unexpected error in launching fluentd: #{e.inspect}\n" + stdio_buf end assert matched, assert_error_msg unless patterns_not_match.empty? lines = stdio_buf.split("\n") patterns_not_match.each do |ptn| matched_wrongly = if ptn.is_a? Regexp lines.any?{|line| ptn.match(line) } else lines.any?{|line| line.include?(ptn) } end assert_false matched_wrongly, "pattern exists in logs wrongly:\n" + stdio_buf end end end def assert_fluentd_fails_to_start(cmdline, *pattern_list, timeout: 10) # empty_list.all?{ ... } is always true matched = false running = false assert_error_msg = "failed to start correctly" stdio_buf = "" begin execute_command(cmdline) do |pid, stdout| begin waiting(timeout) do while process_exist?(pid) && !running readables, _, _ = IO.select([stdout], nil, nil, 1) next unless readables next if readables.first.eof? stdio_buf << eager_read(readables.first) lines = stdio_buf.split("\n") if lines.any?{|line| line.include?("fluentd worker is now running") } running = true end if pattern_list.all?{|ptn| lines.any?{|line| ptn.is_a?(Regexp) ? ptn.match(line) : line.include?(ptn) } } matched = true end end end ensure if SUPERVISOR_PID_PATTERN =~ stdio_buf @supervisor_pid = $1.to_i end stdio_buf.scan(WORKER_PID_PATTERN) do |worker_pid| @worker_pids << worker_pid.first.to_i end end end rescue Timeout::Error assert_error_msg = "execution timeout with command out:\n" + stdio_buf rescue => e assert_error_msg = "unexpected error in launching fluentd: #{e.inspect}\n" + stdio_buf assert false, assert_error_msg end assert !running, "fluentd started to run incorrectly:\n" + stdio_buf unless matched assert_error_msg = "fluentd failed to start, without specified regular expressions:\n" + stdio_buf end assert matched, assert_error_msg end sub_test_case 'with valid configuration' do test 'runs successfully' do conf = < @type dummy @id dummy @label @dummydata tag dummy dummy {"message": "yay!"} CONF conf_path = create_conf_file('valid.conf', conf) assert File.exist?(conf_path) assert_log_matches(create_cmdline(conf_path), "fluentd worker is now running", 'worker=0') end end sub_test_case 'with --conf-encoding' do test 'runs successfully' do conf = < @type dummy tag dummy dummy {"message": "yay!"} @type null CONF conf_path = create_conf_file('shift_jis.conf', conf, 'shift_jis') assert_log_matches(create_cmdline(conf_path, '--conf-encoding', 'shift_jis'), "fluentd worker is now running", 'worker=0') end test 'failed to run by invalid encoding' do conf = < @type dummy tag dummy dummy {"message": "yay!"} @type null CONF conf_path = create_conf_file('shift_jis.conf', conf, 'shift_jis') assert_fluentd_fails_to_start(create_cmdline(conf_path), "invalid byte sequence in UTF-8") end end sub_test_case 'with system configuration about root directory' do setup do @root_path = File.join(TMP_DIR, "rootpath") FileUtils.rm_rf(@root_path) @conf = < root_dir #{@root_path} @type dummy @id dummy @label @dummydata tag dummy dummy {"message": "yay!"} CONF end test 'use the specified existing directory as root' do FileUtils.mkdir_p(@root_path) conf_path = create_conf_file('existing_root_dir.conf', @conf) assert Dir.exist?(@root_path) assert_log_matches(create_cmdline(conf_path), "fluentd worker is now running", 'worker=0') end test 'creates the specified root directory if missing' do conf_path = create_conf_file('missing_root_dir.conf', @conf) assert_false Dir.exist?(@root_path) assert_log_matches(create_cmdline(conf_path), "fluentd worker is now running", 'worker=0') assert Dir.exist?(@root_path) end test 'fails to launch fluentd if specified root path is invalid path for directory' do File.open(@root_path, 'w') do |_| # create file and close it end conf_path = create_conf_file('existing_root_dir.conf', @conf) assert_fluentd_fails_to_start( create_cmdline(conf_path), "non directory entry exists:#{@root_path}", ) end end sub_test_case 'configured to route log events to plugins' do setup do @basic_conf = < @type dummy @id dummy tag dummy dummy {"message": "yay!"} @type null @id blackhole CONF end test 'by top level section' do conf = @basic_conf + < @type stdout CONF conf_path = create_conf_file('logevent_1.conf', conf) assert_log_matches( create_cmdline(conf_path), "fluentd worker is now running", 'fluent.info: {"worker":0,"message":"fluentd worker is now running worker=0"}', "define to capture fluentd logs in top level is deprecated. Use