require_relative '../helper' require 'fluent/test/driver/output' require 'fluent/plugin/out_exec_filter' require 'fileutils' class ExecFilterOutputTest < Test::Unit::TestCase def setup Fluent::Test.setup end CONFIG = %[ command cat num_children 3 tag_key tag time_key time_in time_type string time_format %Y-%m-%d %H:%M:%S keys ["time_in", "tag", "k1"] keys ["time_out", "tag", "k2"] tag_key tag time_key time_out time_type string time_format %Y-%m-%d %H:%M:%S ] CONFIG_COMPAT = %[ command cat in_keys time_in,tag,k1 out_keys time_out,tag,k2 tag_key tag in_time_key time_in out_time_key time_out time_format %Y-%m-%d %H:%M:%S localtime num_children 3 ] def create_driver(conf) Fluent::Test::Driver::Output.new(Fluent::Plugin::ExecFilterOutput).configure(conf) end SED_SUPPORT_UNBUFFERED_OPTION = ->(){ system("echo xxx | sed --unbuffered -l -e 's/x/y/g' >#{IO::NULL} 2>&1") $?.success? }.call SED_UNBUFFERED_OPTION = SED_SUPPORT_UNBUFFERED_OPTION ? '--unbuffered' : '' data( 'with sections' => CONFIG, 'traditional' => CONFIG_COMPAT, ) test 'configure' do |conf| d = create_driver(conf) assert_false d.instance.parser.estimate_current_event assert_equal ["time_in","tag","k1"], d.instance.formatter.keys assert_equal ["time_out","tag","k2"], d.instance.parser.keys assert_equal "tag", d.instance.inject_config.tag_key assert_equal "tag", d.instance.extract_config.tag_key assert_equal "time_in", d.instance.inject_config.time_key assert_equal "time_out", d.instance.extract_config.time_key assert_equal "%Y-%m-%d %H:%M:%S", d.instance.inject_config.time_format assert_equal "%Y-%m-%d %H:%M:%S", d.instance.extract_config.time_format assert_equal true, d.instance.inject_config.localtime assert_equal 3, d.instance.num_children d = create_driver %[ command sed -l -e s/foo/bar/ in_keys time,k1 out_keys time,k2 tag xxx time_key time num_children 3 ] assert_equal "sed -l -e s/foo/bar/", d.instance.command d = create_driver(conf + %[ remove_prefix before add_prefix after ]) assert_equal "before", d.instance.remove_prefix assert_equal "after" , d.instance.add_prefix end data( 'with sections' => CONFIG, 'traditional' => CONFIG_COMPAT, ) test 'emit events with TSV format' do |conf| d = create_driver(conf) time = event_time("2011-01-02 13:14:15") d.run(default_tag: 'test', expect_emits: 2, timeout: 10) do # sleep 0.1 until d.instance.children && !d.instance.children.empty? && d.instance.children.all?{|c| c.finished == false } d.feed(time, {"k1"=>1}) d.feed(time, {"k1"=>2}) end assert_equal "2011-01-02 13:14:15\ttest\t1\n", d.formatted[0] assert_equal "2011-01-02 13:14:15\ttest\t2\n", d.formatted[1] events = d.events assert_equal 2, events.length assert_equal_event_time time, events[0][1] assert_equal ["test", time, {"k2"=>"1"}], events[0] assert_equal_event_time time, events[1][1] assert_equal ["test", time, {"k2"=>"2"}], events[1] end CONFIG_WITHOUT_TIME_FORMAT = %[ command cat num_children 3 tag xxx time_key time time_type unixtime keys time,k1 keys time,k2 time_key time time_type unixtime ] CONFIG_WITHOUT_TIME_FORMAT_COMPAT = %[ command cat in_keys time,k1 out_keys time,k2 tag xxx time_key time num_children 3 ] data( 'with sections' => CONFIG_WITHOUT_TIME_FORMAT, 'traditional' => CONFIG_WITHOUT_TIME_FORMAT_COMPAT, ) test 'emit events without time format configuration' do |conf| d = create_driver(conf) time = event_time("2011-01-02 13:14:15 +0900") d.run(default_tag: 'test', expect_emits: 2, timeout: 10) do d.feed(time, {"k1"=>1}) d.feed(time, {"k1"=>2}) end assert_equal "1293941655\t1\n", d.formatted[0] assert_equal "1293941655\t2\n", d.formatted[1] events = d.events assert_equal 2, events.length assert_equal_event_time time, events[0][1] assert_equal ["xxx", time, {"k2"=>"1"}], events[0] assert_equal_event_time time, events[1][1] assert_equal ["xxx", time, {"k2"=>"2"}], events[1] end CONFIG_TO_DO_GREP = %[ command grep --line-buffered -v poo num_children 3 tag xxx time_key time time_type unixtime keys time, val1 keys time, val2 time_key time time_type unixtime ] CONFIG_TO_DO_GREP_COMPAT = %[ command grep --line-buffered -v poo in_keys time,val1 out_keys time,val2 tag xxx time_key time num_children 3 ] data( 'with sections' => CONFIG_TO_DO_GREP, 'traditional' => CONFIG_TO_DO_GREP_COMPAT, ) test 'emit events through grep command' do |conf| d = create_driver(conf) time = event_time("2011-01-02 13:14:15 +0900") d.run(default_tag: 'test', expect_emits: 1, timeout: 10) do d.feed(time, {"val1"=>"sed-ed value poo"}) d.feed(time, {"val1"=>"sed-ed value foo"}) end assert_equal "1293941655\tsed-ed value poo\n", d.formatted[0] assert_equal "1293941655\tsed-ed value foo\n", d.formatted[1] events = d.events assert_equal 1, events.length assert_equal_event_time time, events[0][1] assert_equal ["xxx", time, {"val2"=>"sed-ed value foo"}], events[0] end CONFIG_TO_DO_SED = %[ command sed #{SED_UNBUFFERED_OPTION} -l -e s/foo/bar/ num_children 3 tag xxx time_key time time_type unixtime keys time, val1 keys time, val2 time_key time time_type unixtime ] CONFIG_TO_DO_SED_COMPAT = %[ command sed #{SED_UNBUFFERED_OPTION} -l -e s/foo/bar/ in_keys time,val1 out_keys time,val2 tag xxx time_key time num_children 3 ] data( 'with sections' => CONFIG_TO_DO_SED, 'traditional' => CONFIG_TO_DO_SED_COMPAT, ) test 'emit events through sed command' do |conf| d = create_driver(conf) time = event_time("2011-01-02 13:14:15 +0900") d.run(default_tag: 'test', expect_emits: 1, timeout: 10) do d.feed(time, {"val1"=>"sed-ed value poo"}) d.feed(time, {"val1"=>"sed-ed value foo"}) end assert_equal "1293941655\tsed-ed value poo\n", d.formatted[0] assert_equal "1293941655\tsed-ed value foo\n", d.formatted[1] events = d.events assert_equal 2, events.length assert_equal_event_time time, events[0][1] assert_equal ["xxx", time, {"val2"=>"sed-ed value poo"}], events[0] assert_equal_event_time time, events[1][1] assert_equal ["xxx", time, {"val2"=>"sed-ed value bar"}], events[1] end CONFIG_TO_DO_SED_WITH_TAG_MODIFY = %[ command sed #{SED_UNBUFFERED_OPTION} -l -e s/foo/bar/ num_children 3 remove_prefix input add_prefix output tag_key tag time_key time keys tag, time, val1 keys tag, time, val2 tag_key tag time_key time ] CONFIG_TO_DO_SED_WITH_TAG_MODIFY_COMPAT = %[ command sed #{SED_UNBUFFERED_OPTION} -l -e s/foo/bar/ in_keys tag,time,val1 remove_prefix input out_keys tag,time,val2 add_prefix output tag_key tag time_key time num_children 3 ] data( 'with sections' => CONFIG_TO_DO_SED_WITH_TAG_MODIFY, 'traditional' => CONFIG_TO_DO_SED_WITH_TAG_MODIFY_COMPAT, ) test 'emit events with add/remove tag prefix' do |conf| d = create_driver(conf) time = event_time("2011-01-02 13:14:15 +0900") d.run(default_tag: 'input.test', expect_emits: 2, timeout: 10) do d.feed(time, {"val1"=>"sed-ed value foo"}) d.feed(time, {"val1"=>"sed-ed value poo"}) end assert_equal "test\t1293941655\tsed-ed value foo\n", d.formatted[0] assert_equal "test\t1293941655\tsed-ed value poo\n", d.formatted[1] events = d.events assert_equal 2, events.length assert_equal_event_time time, events[0][1] assert_equal ["output.test", time, {"val2"=>"sed-ed value bar"}], events[0] assert_equal_event_time time, events[1][1] assert_equal ["output.test", time, {"val2"=>"sed-ed value poo"}], events[1] end CONFIG_JSON = %[ command cat @type tsv keys message @type json stream_buffer_size 1 tag_key tag time_key time ] CONFIG_JSON_COMPAT = %[ command cat in_keys message out_format json out_stream_buffer_size 1 time_key time tag_key tag ] data( 'with sections' => CONFIG_JSON, 'traditional' => CONFIG_JSON_COMPAT, ) test 'using json format' do |conf| d = create_driver(conf) time = event_time("2011-01-02 13:14:15 +0900") d.run(default_tag: 'input.test', expect_emits: 1, timeout: 10) do i = d.instance assert{ i.router } d.feed(time, {"message"=>%[{"time":#{time},"tag":"t1","k1":"v1"}]}) end assert_equal '{"time":1293941655,"tag":"t1","k1":"v1"}' + "\n", d.formatted[0] events = d.events assert_equal 1, events.length assert_equal_event_time time, events[0][1] assert_equal ["t1", time, {"k1"=>"v1"}], events[0] end CONFIG_JSON_WITH_FLOAT_TIME = %[ command cat @type tsv keys message @type json stream_buffer_size 1 tag_key tag time_key time ] CONFIG_JSON_WITH_FLOAT_TIME_COMPAT = %[ command cat in_keys message out_format json out_stream_buffer_size 1 time_key time tag_key tag ] data( 'with sections' => CONFIG_JSON_WITH_FLOAT_TIME, 'traditional' => CONFIG_JSON_WITH_FLOAT_TIME_COMPAT, ) test 'using json format with float time' do |conf| d = create_driver(conf) time = event_time("2011-01-02 13:14:15.123 +0900") d.run(default_tag: 'input.test', expect_emits: 1, timeout: 10) do d.feed(time + 10, {"message"=>%[{"time":#{time.sec}.#{time.nsec},"tag":"t1","k1":"v1"}]}) end assert_equal '{"time":1293941655.123000000,"tag":"t1","k1":"v1"}' + "\n", d.formatted[0] events = d.events assert_equal 1, events.length assert_equal_event_time time, events[0][1] assert_equal ["t1", time, {"k1"=>"v1"}], events[0] end CONFIG_JSON_WITH_TIME_FORMAT = %[ command cat @type tsv keys message @type json stream_buffer_size 1 tag_key tag time_key time time_type string time_format %d/%b/%Y %H:%M:%S.%N %z ] CONFIG_JSON_WITH_TIME_FORMAT_COMPAT = %[ command cat in_keys message out_format json out_stream_buffer_size 1 time_key time time_format %d/%b/%Y %H:%M:%S.%N %z tag_key tag ] data( 'with sections' => CONFIG_JSON_WITH_TIME_FORMAT, 'traditional' => CONFIG_JSON_WITH_TIME_FORMAT_COMPAT, ) test 'using json format with custom time format' do |conf| d = create_driver(conf) time_str = "28/Feb/2013 12:00:00.123456789 +0900" time = event_time(time_str, format: "%d/%b/%Y %H:%M:%S.%N %z") d.run(default_tag: 'input.test', expect_emits: 1, timeout: 10) do d.feed(time + 10, {"message"=>%[{"time":"#{time_str}","tag":"t1","k1":"v1"}]}) end assert_equal '{"time":"28/Feb/2013 12:00:00.123456789 +0900","tag":"t1","k1":"v1"}' + "\n", d.formatted[0] events = d.events assert_equal 1, events.length assert_equal_event_time time, events[0][1] assert_equal ["t1", time, {"k1"=>"v1"}], events[0] end CONFIG_ROUND_ROBIN = %[ command ruby -e 'STDOUT.sync = true; STDIN.each_line{|line| puts line.chomp + "\t" + Process.pid.to_s }' num_children 2 tag_key tag time_key time_in time_type string time_format %Y-%m-%d %H:%M:%S keys ["time_in", "tag", "k1"] keys ["time_out", "tag", "k2", "child_pid"] tag_key tag time_key time_out time_type string time_format %Y-%m-%d %H:%M:%S ] CONFIG_ROUND_ROBIN_COMPAT = %[ command ruby -e 'STDOUT.sync = true; STDIN.each_line{|line| puts line.chomp + "\t" + Process.pid.to_s }' in_keys time_in,tag,k1 out_keys time_out,tag,k2,child_pid tag_key tag in_time_key time_in out_time_key time_out time_format %Y-%m-%d %H:%M:%S localtime num_children 2 ] data( 'with sections' => CONFIG_ROUND_ROBIN, 'traditional' => CONFIG_ROUND_ROBIN_COMPAT, ) test 'using child processes by round robin' do |conf| d = create_driver(conf) time = event_time('2011-01-02 13:14:15') d.run(default_tag: 'test', expect_emits: 4) do d.feed(time, {"k1" => 0}) d.flush sleep 0.5 d.feed(time, {"k1" => 1}) d.flush sleep 0.5 d.feed(time, {"k1" => 2}) d.flush sleep 0.5 d.feed(time, {"k1" => 3}) end assert_equal "2011-01-02 13:14:15\ttest\t0\n", d.formatted[0] assert_equal "2011-01-02 13:14:15\ttest\t1\n", d.formatted[1] assert_equal "2011-01-02 13:14:15\ttest\t2\n", d.formatted[2] assert_equal "2011-01-02 13:14:15\ttest\t3\n", d.formatted[3] events = d.events assert_equal 4, events.length pid_list = [] events.each do |event| pid = event[2]['child_pid'] pid_list << pid unless pid_list.include?(pid) end assert_equal 2, pid_list.size, "the number of pids should be same with number of child processes: #{pid_list.inspect}" assert_equal pid_list[0], events[0][2]['child_pid'] assert_equal pid_list[1], events[1][2]['child_pid'] assert_equal pid_list[0], events[2][2]['child_pid'] assert_equal pid_list[1], events[3][2]['child_pid'] end # child process exits per 3 lines CONFIG_RESPAWN = %[ command ruby -e 'STDOUT.sync = true; proc = ->(){line = STDIN.readline.chomp; puts line + "\t" + Process.pid.to_s}; proc.call; proc.call; proc.call' num_children 2 child_respawn -1 tag_key tag time_key time_in time_type unixtime keys ["time_in", "tag", "k1"] keys ["time_out", "tag", "k2", "child_pid"] tag_key tag time_key time_out time_type unixtime ] CONFIG_RESPAWN_COMPAT = %[ command ruby -e 'STDOUT.sync = true; proc = ->(){line = STDIN.readline.chomp; puts line + "\t" + Process.pid.to_s}; proc.call; proc.call; proc.call' num_children 2 child_respawn -1 in_keys time_in,tag,k1 out_keys time_out,tag,k2,child_pid tag_key tag in_time_key time_in out_time_key time_out # time_format %Y-%m-%d %H:%M:%S # localtime ] data( 'with sections' => CONFIG_RESPAWN, 'traditional' => CONFIG_RESPAWN_COMPAT, ) test 'emit events via child processes which exits sometimes' do |conf| d = create_driver(conf) time = event_time("2011-01-02 13:14:15") countup = 0 d.run(start: true, shutdown: false) assert_equal 2, d.instance.instance_eval{ @_child_process_processes.size } 2.times do d.run(default_tag: 'test', expect_emits: 3, timeout: 3, force_flush_retry: true, start: false, shutdown: false) do d.feed(time, { "k1" => countup }); countup += 1 d.feed(time, { "k1" => countup }); countup += 1 d.feed(time, { "k1" => countup }); countup += 1 end end events = d.events assert_equal 6, events.length pid_list = [] events.each do |event| pid = event[2]['child_pid'] pid_list << pid unless pid_list.include?(pid) end # the number of pids should be same with number of child processes assert_equal 2, pid_list.size logs = d.instance.log.out.logs assert_equal 2, logs.count { |l| l.include?('child process exits with error code') } assert_equal 2, logs.count { |l| l.include?('respawning child process') } ensure d.run(start: false, shutdown: true) end end