require_relative '../helper' require 'fluent/test/driver/input' require 'fluent/plugin/in_monitor_agent' require 'fluent/engine' require 'fluent/config' require 'fluent/event_router' require 'fluent/supervisor' require 'net/http' require 'json' require_relative '../test_plugin_classes' class MonitorAgentInputTest < Test::Unit::TestCase def setup Fluent::Test.setup end def create_driver(conf = '') Fluent::Test::Driver::Input.new(Fluent::Plugin::MonitorAgentInput).configure(conf) end def configure_ra(ra, conf_str) conf = Fluent::Config.parse(conf_str, "(test)", "(test_dir)", true) ra.configure(conf) ra end def test_configure d = create_driver assert_equal("0.0.0.0", d.instance.bind) assert_equal(24220, d.instance.port) assert_equal(nil, d.instance.tag) assert_equal(60, d.instance.emit_interval) assert_true d.instance.include_config end sub_test_case "collect plugin information" do setup do # check @type and type in one configuration conf = <<-EOC @type test_in @id test_in @type test_filter @id test_filter @type relabel @id test_relabel @label @test EOC @ra = Fluent::RootAgent.new(log: $log) stub(Fluent::Engine).root_agent { @ra } @ra = configure_ra(@ra, conf) end test "plugin_category" do d = create_driver test_label = @ra.labels['@test'] error_label = @ra.labels['@ERROR'] assert_equal("input", d.instance.plugin_category(@ra.inputs.first)) assert_equal("filter", d.instance.plugin_category(@ra.filters.first)) assert_equal("output", d.instance.plugin_category(test_label.outputs.first)) assert_equal("output", d.instance.plugin_category(error_label.outputs.first)) end data(:with_config_yes => true, :with_config_no => false) test "get_monitor_info" do |with_config| d = create_driver test_label = @ra.labels['@test'] error_label = @ra.labels['@ERROR'] input_info = { "output_plugin" => false, "plugin_category"=> "input", "plugin_id" => "test_in", "retry_count" => nil, "type" => "test_in" } input_info.merge!("config" => {"@id" => "test_in", "@type" => "test_in"}) if with_config filter_info = { "output_plugin" => false, "plugin_category" => "filter", "plugin_id" => "test_filter", "retry_count" => nil, "type" => "test_filter" } filter_info.merge!("config" => {"@id" => "test_filter", "@type" => "test_filter"}) if with_config output_info = { "output_plugin" => true, "plugin_category" => "output", "plugin_id" => "test_out", "retry_count" => 0, "type" => "test_out" } output_info.merge!("config" => {"@id" => "test_out", "@type" => "test_out"}) if with_config error_label_info = { "buffer_queue_length" => 0, "buffer_total_queued_size" => 0, "output_plugin" => true, "plugin_category" => "output", "plugin_id" => "null", "retry_count" => 0, "type" => "null" } error_label_info.merge!("config" => {"@id"=>"null", "@type" => "null"}) if with_config opts = {with_config: with_config} assert_equal(input_info, d.instance.get_monitor_info(@ra.inputs.first, opts)) assert_equal(filter_info, d.instance.get_monitor_info(@ra.filters.first, opts)) assert_equal(output_info, d.instance.get_monitor_info(test_label.outputs.first, opts)) assert_equal(error_label_info, d.instance.get_monitor_info(error_label.outputs.first, opts)) end test "fluentd opts" do d = create_driver opts = Fluent::Supervisor.default_options Fluent::Supervisor.new(opts) expected_opts = { "config_path" => "/etc/fluent/fluent.conf", "pid_file" => nil, "plugin_dirs" => ["/etc/fluent/plugin"], "log_path" => nil, "root_dir" => nil, } assert_equal(expected_opts, d.instance.fluentd_opts) end test "all_plugins" do d = create_driver plugins = [] d.instance.all_plugins.each {|plugin| plugins << plugin.class } assert_equal([FluentTest::FluentTestInput, Fluent::Plugin::RelabelOutput, FluentTest::FluentTestFilter, FluentTest::FluentTestOutput, # in label @test Fluent::Plugin::CopyOutput, FluentTest::FluentTestOutput, # in label @copy 1 FluentTest::FluentTestOutput, # in label @copy 2 Fluent::Plugin::NullOutput], plugins) end test "emit" do port = unused_port d = create_driver(" @type monitor_agent bind '127.0.0.1' port #{port} tag monitor emit_interval 1 ") d.instance.start d.end_if do d.events.size >= 5 end d.run expect_relabel_record = { "plugin_id" => "test_relabel", "plugin_category" => "output", "type" => "relabel", "output_plugin" => true, "retry_count" => 0} expect_test_out_record = { "plugin_id" => "test_out", "plugin_category" => "output", "type" => "test_out", "output_plugin" => true, "retry_count" => 0 } assert_equal(expect_relabel_record, d.events[1][2]) assert_equal(expect_test_out_record, d.events[3][2]) end end def get(uri, header = {}) url = URI.parse(uri) req = Net::HTTP::Get.new(url, header) unless header.has_key?('Content-Type') header['Content-Type'] = 'application/octet-stream' end res = Net::HTTP.start(url.host, url.port) {|http| http.request(req) } res.body end sub_test_case "servlets" do setup do @port = unused_port # check @type and type in one configuration conf = <<-EOC @type test_in @id test_in @type monitor_agent bind "127.0.0.1" port #{@port} tag monitor @id monitor_agent @type test_filter @id test_filter @type relabel @id test_relabel @label @test EOC @ra = Fluent::RootAgent.new(log: $log) stub(Fluent::Engine).root_agent { @ra } @ra = configure_ra(@ra, conf) # store Supervisor instance to avoid collected by GC @supervisor = Fluent::Supervisor.new(Fluent::Supervisor.default_options) end test "/api/plugins" do d = create_driver(" @type monitor_agent bind '127.0.0.1' port #{@port} tag monitor ") d.instance.start expected_test_in_response = "\ plugin_id:test_in\tplugin_category:input\ttype:test_in\toutput_plugin:false\tretry_count:" expected_test_filter_response = "\ plugin_id:test_filter\tplugin_category:filter\ttype:test_filter\toutput_plugin:false\tretry_count:" response = get("http://127.0.0.1:#{@port}/api/plugins") test_in = response.split("\n")[0] test_filter = response.split("\n")[3] assert_equal(expected_test_in_response, test_in) assert_equal(expected_test_filter_response, test_filter) end data(:include_config_and_retry_yes => [true, true, "include_config yes", "include_retry yes"], :include_config_and_retry_no => [false, false, "include_config no", "include_retry no"],) test "/api/plugins.json" do |(with_config, with_retry, include_conf, retry_conf)| d = create_driver(" @type monitor_agent bind '127.0.0.1' port #{@port} tag monitor #{include_conf} #{retry_conf} ") d.instance.start expected_test_in_response = { "output_plugin" => false, "plugin_category" => "input", "plugin_id" => "test_in", "retry_count" => nil, "type" => "test_in" } expected_test_in_response.merge!("config" => {"@id" => "test_in", "@type" => "test_in"}) if with_config expected_null_response = { "buffer_queue_length" => 0, "buffer_total_queued_size" => 0, "output_plugin" => true, "plugin_category" => "output", "plugin_id" => "null", "retry_count" => 0, "type" => "null" } expected_null_response.merge!("config" => {"@id" => "null", "@type" => "null"}) if with_config expected_null_response.merge!("retry" => {}) if with_retry response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json")) test_in_response = response["plugins"][0] null_response = response["plugins"][5] assert_equal(expected_test_in_response, test_in_response) assert_equal(expected_null_response, null_response) end data(:with_config_and_retry_yes => [true, true, "?with_config=yes&with_retry"], :with_config_and_retry_no => [false, false, "?with_config=no&with_retry=no"]) test "/api/plugins.json with query parameter. query parameter is preferred than include_config" do |(with_config, with_retry, query_param)| d = create_driver(" @type monitor_agent bind '127.0.0.1' port #{@port} tag monitor ") d.instance.start expected_test_in_response = { "output_plugin" => false, "plugin_category" => "input", "plugin_id" => "test_in", "retry_count" => nil, "type" => "test_in" } expected_test_in_response.merge!("config" => {"@id" => "test_in", "@type" => "test_in"}) if with_config expected_null_response = { "buffer_queue_length" => 0, "buffer_total_queued_size" => 0, "output_plugin" => true, "plugin_category" => "output", "plugin_id" => "null", "retry_count" => 0, "type" => "null" } expected_null_response.merge!("config" => {"@id" => "null", "@type" => "null"}) if with_config expected_null_response.merge!("retry" => {}) if with_retry response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json#{query_param}")) test_in_response = response["plugins"][0] null_response = response["plugins"][5] assert_equal(expected_test_in_response, test_in_response) assert_equal(expected_null_response, null_response) end test "/api/config" do d = create_driver(" @type monitor_agent bind '127.0.0.1' port #{@port} tag monitor ") d.instance.start expected_response_regex = /pid:\d+\tppid:\d+\tconfig_path:\/etc\/fluent\/fluent.conf\tpid_file:\tplugin_dirs:\[\"\/etc\/fluent\/plugin\"\]\tlog_path:/ assert_match(expected_response_regex, get("http://127.0.0.1:#{@port}/api/config")) end test "/api/config.json" do d = create_driver(" @type monitor_agent bind '127.0.0.1' port #{@port} tag monitor ") d.instance.start res = JSON.parse(get("http://127.0.0.1:#{@port}/api/config.json")) assert_equal("/etc/fluent/fluent.conf", res["config_path"]) assert_nil(res["pid_file"]) assert_equal(["/etc/fluent/plugin"], res["plugin_dirs"]) assert_nil(res["log_path"]) end end sub_test_case "check retry of buffered plugins" do class FluentTestFailWriteOutput < ::Fluent::Plugin::Output ::Fluent::Plugin.register_output('test_out_fail_write', self) def write(chunk) raise "chunk error!" end end setup do @port = unused_port # check @type and type in one configuration conf = <<-EOC @type monitor_agent @id monitor_agent bind "127.0.0.1" port #{@port} @type test_out_fail_write @id test_out_fail_write flush_mode immediate EOC @ra = Fluent::RootAgent.new(log: $log) stub(Fluent::Engine).root_agent { @ra } @ra = configure_ra(@ra, conf) end test "/api/plugins.json retry object should be filled if flush was failed" do d = create_driver(" @type monitor_agent bind '127.0.0.1' port #{@port} include_config no ") d.instance.start expected_test_out_fail_write_response = { "buffer_queue_length" => 1, "buffer_total_queued_size" => 0, "output_plugin" => true, "plugin_category" => "output", "plugin_id" => "test_out_fail_write", "retry_count" => 2, "type" => "test_out_fail_write", "retry" => { "steps" => 1 } } output = @ra.outputs[0] output.start output.after_start output.emit_events('test.tag', Fluent::ArrayEventStream.new([[event_time, {"message" => "test failed flush 1"}]])) # flush few times to check steps 2.times do output.force_flush end response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json")) test_out_fail_write_response = response["plugins"][1] # remove dynamic keys ["start", "next_time"].each { |key| test_out_fail_write_response["retry"].delete(key) } assert_equal(expected_test_out_fail_write_response, test_out_fail_write_response) end end end