require_relative '../helper' require 'fluent/test/driver/input' require 'fluent/plugin/in_monitor_agent' require 'fluent/engine' require 'fluent/config' require 'fluent/event_router' require 'fluent/supervisor' require 'net/http' require 'json' require_relative '../test_plugin_classes' class MonitorAgentInputTest < Test::Unit::TestCase include FuzzyAssert CONFIG_DIR = File.expand_path('../tmp/in_monitor_agent', __dir__) def setup Fluent::Test.setup end def create_driver(conf = '') Fluent::Test::Driver::Input.new(Fluent::Plugin::MonitorAgentInput).configure(conf) end def configure_ra(ra, conf_str) conf = Fluent::Config.parse(conf_str, "(test)", "(test_dir)", true) ra.configure(conf) ra end def test_configure d = create_driver assert_equal("0.0.0.0", d.instance.bind) assert_equal(24220, d.instance.port) assert_equal(nil, d.instance.tag) assert_equal(60, d.instance.emit_interval) assert_true d.instance.include_config end sub_test_case "collect in_monitor_agent plugin statistics" do # Input Test Driver does not register metric callbacks. # We should stub them here. class TestEventMetricRouter < Fluent::Test::Driver::TestEventRouter def initialize(driver) super raise ArgumentError, "plugin does not respond metric_callback method" unless @driver.instance.respond_to?(:metric_callback) end def emit(tag, time, record) super @driver.instance.metric_callback(OneEventStream.new(time, record)) end def emit_array(tag, array) super @driver.instance.metric_callback(ArrayEventStream.new(array)) end def emit_stream(tag, es) super @driver.instance.metric_callback(es) end end class MetricInputDriver < Fluent::Test::Driver::Input def configure(conf, syntax: :v1) if conf.is_a?(Fluent::Config::Element) @config = conf else @config = Fluent::Config.parse(conf, "(test)", "(test_dir)", syntax: syntax) end if @instance.respond_to?(:router=) @event_streams = [] @error_events = [] driver = self mojule = Module.new do define_method(:event_emitter_router) do |label_name| TestEventMetricRouter.new(driver) end end @instance.singleton_class.prepend mojule end @instance.configure(@config) self end end setup do # check @type and type in one configuration conf = <<-EOC @type test_in_gen @id test_in_gen num 10 @type test_filter @id test_filter @type relabel @id test_relabel @label @test EOC @ra = Fluent::RootAgent.new(log: $log) stub(Fluent::Engine).root_agent { @ra } @ra = configure_ra(@ra, conf) end data(:with_config_yes => true, :with_config_no => false) def test_enable_input_metrics(with_config) monitor_agent_conf = <<-CONF tag test.monitor emit_interval 1 CONF @ra.inputs.first.context_router.emit("test.event", Fluent::Engine.now, {"message":"ok"}) d = MetricInputDriver.new(Fluent::Plugin::MonitorAgentInput).configure(monitor_agent_conf) d.run(expect_emits: 1, timeout: 3) test_label = @ra.labels['@test'] error_label = @ra.labels['@ERROR'] input_info = { "output_plugin" => false, "plugin_category"=> "input", "plugin_id" => "test_in_gen", "retry_count" => nil, "type" => "test_in_gen", "emit_records" => 0, # This field is not updated due to not to be assigned metric callback. "emit_size" => 0, # Ditto. } input_info.merge!("config" => {"@id" => "test_in_gen", "@type" => "test_in_gen", "num" => "10"}) if with_config filter_info = { "output_plugin" => false, "plugin_category" => "filter", "plugin_id" => "test_filter", "retry_count" => nil, "type" => "test_filter", "emit_records" => Integer, "emit_size" => Integer, } filter_info.merge!("config" => {"@id" => "test_filter", "@type" => "test_filter"}) if with_config output_info = { "output_plugin" => true, "plugin_category" => "output", "plugin_id" => "test_out", "retry_count" => 0, "type" => "test_out", "emit_count" => Integer, "emit_records" => Integer, "emit_size" => Integer, "write_count" => Integer, "rollback_count" => Integer, "slow_flush_count" => Integer, "flush_time_count" => Integer, } output_info.merge!("config" => {"@id" => "test_out", "@type" => "test_out"}) if with_config error_label_info = { "buffer_queue_length" => 0, "buffer_timekeys" => [], "buffer_total_queued_size" => 0, "output_plugin" => true, "plugin_category" => "output", "plugin_id" => "null", "retry_count" => 0, "type" => "null", "buffer_available_buffer_space_ratios" => Float, "buffer_queue_byte_size" => Integer, "buffer_stage_byte_size" => Integer, "buffer_stage_length" => Integer, "emit_count" => Integer, "emit_records" => Integer, "emit_size" => Integer, "write_count" => Integer, "rollback_count" => Integer, "slow_flush_count" => Integer, "flush_time_count" => Integer, } error_label_info.merge!("config" => {"@id"=>"null", "@type" => "null"}) if with_config opts = {with_config: with_config} assert_equal(input_info, d.instance.get_monitor_info(@ra.inputs.first, opts)) assert_fuzzy_equal(filter_info, d.instance.get_monitor_info(@ra.filters.first, opts)) assert_fuzzy_equal(output_info, d.instance.get_monitor_info(test_label.outputs.first, opts)) assert_fuzzy_equal(error_label_info, d.instance.get_monitor_info(error_label.outputs.first, opts)) monitor_agent_emit_info = { "emit_records" => Integer, "emit_size" => Integer, } filter_statistics_info = { "emit_records" => Integer, "emit_size" => Integer, } assert_fuzzy_equal(monitor_agent_emit_info, d.instance.statistics["input"]) assert_fuzzy_equal(filter_statistics_info, @ra.filters.first.statistics["filter"]) end end sub_test_case "collect plugin information" do setup do # check @type and type in one configuration conf = <<-EOC @type test_in @id test_in @type test_filter @id test_filter @type relabel @id test_relabel @label @test EOC @ra = Fluent::RootAgent.new(log: $log) stub(Fluent::Engine).root_agent { @ra } @ra = configure_ra(@ra, conf) end test "plugin_category" do d = create_driver test_label = @ra.labels['@test'] error_label = @ra.labels['@ERROR'] assert_equal("input", d.instance.plugin_category(@ra.inputs.first)) assert_equal("filter", d.instance.plugin_category(@ra.filters.first)) assert_equal("output", d.instance.plugin_category(test_label.outputs.first)) assert_equal("output", d.instance.plugin_category(error_label.outputs.first)) end data(:with_config_yes => true, :with_config_no => false) test "get_monitor_info" do |with_config| d = create_driver test_label = @ra.labels['@test'] error_label = @ra.labels['@ERROR'] input_info = { "output_plugin" => false, "plugin_category"=> "input", "plugin_id" => "test_in", "retry_count" => nil, "type" => "test_in", "emit_records" => 0, "emit_size" => 0, } input_info.merge!("config" => {"@id" => "test_in", "@type" => "test_in"}) if with_config filter_info = { "output_plugin" => false, "plugin_category" => "filter", "plugin_id" => "test_filter", "retry_count" => nil, "type" => "test_filter", "emit_records" => 0, "emit_size" => 0, } filter_info.merge!("config" => {"@id" => "test_filter", "@type" => "test_filter"}) if with_config output_info = { "output_plugin" => true, "plugin_category" => "output", "plugin_id" => "test_out", "retry_count" => 0, "type" => "test_out", "emit_count" => Integer, "emit_records" => Integer, "emit_size" => Integer, "write_count" => Integer, "rollback_count" => Integer, "slow_flush_count" => Integer, "flush_time_count" => Integer, } output_info.merge!("config" => {"@id" => "test_out", "@type" => "test_out"}) if with_config error_label_info = { "buffer_queue_length" => 0, "buffer_timekeys" => [], "buffer_total_queued_size" => 0, "output_plugin" => true, "plugin_category" => "output", "plugin_id" => "null", "retry_count" => 0, "type" => "null", "buffer_available_buffer_space_ratios" => Float, "buffer_queue_byte_size" => Integer, "buffer_stage_byte_size" => Integer, "buffer_stage_length" => Integer, "emit_count" => Integer, "emit_records" => Integer, "emit_size" => Integer, "write_count" => Integer, "rollback_count" => Integer, "slow_flush_count" => Integer, "flush_time_count" => Integer, } error_label_info.merge!("config" => {"@id"=>"null", "@type" => "null"}) if with_config opts = {with_config: with_config} assert_equal(input_info, d.instance.get_monitor_info(@ra.inputs.first, opts)) assert_fuzzy_equal(filter_info, d.instance.get_monitor_info(@ra.filters.first, opts)) assert_fuzzy_equal(output_info, d.instance.get_monitor_info(test_label.outputs.first, opts)) assert_fuzzy_equal(error_label_info, d.instance.get_monitor_info(error_label.outputs.first, opts)) end test "fluentd opts" do d = create_driver filepath = nil begin FileUtils.mkdir_p(CONFIG_DIR) filepath = File.expand_path('fluentd.conf', CONFIG_DIR) FileUtils.touch(filepath) s = Fluent::Supervisor.new({config_path: filepath}) s.configure ensure FileUtils.rm_r(CONFIG_DIR) rescue _ end expected_opts = { "config_path" => filepath, "pid_file" => nil, "plugin_dirs" => ["/etc/fluent/plugin"], "log_path" => nil, "root_dir" => nil, } assert_equal(expected_opts, d.instance.fluentd_opts) end test "all_plugins" do d = create_driver plugins = [] d.instance.all_plugins.each {|plugin| plugins << plugin.class } assert_equal([FluentTest::FluentTestInput, Fluent::Plugin::RelabelOutput, FluentTest::FluentTestFilter, FluentTest::FluentTestOutput, # in label @test Fluent::Plugin::CopyOutput, FluentTest::FluentTestOutput, # in label @copy 1 FluentTest::FluentTestOutput, # in label @copy 2 Fluent::Plugin::NullOutput], plugins) end test "emit" do port = unused_port d = create_driver(" @type monitor_agent bind '127.0.0.1' port #{port} tag monitor emit_interval 1 ") d.instance.start d.end_if do d.events.size >= 5 end d.run expect_relabel_record = { "plugin_id" => "test_relabel", "plugin_category" => "output", "type" => "relabel", "output_plugin" => true, "retry_count" => 0, "emit_count" => Integer, "emit_records" => Integer, "emit_size" => Integer, "write_count" => Integer, "rollback_count" => Integer, "slow_flush_count" => Integer, "flush_time_count" => Integer, } expect_test_out_record = { "plugin_id" => "test_out", "plugin_category" => "output", "type" => "test_out", "output_plugin" => true, "retry_count" => 0, "emit_count" => Integer, "emit_records" => Integer, "emit_size" => Integer, "write_count" => Integer, "rollback_count" => Integer, "slow_flush_count" => Integer, "flush_time_count" => Integer, } assert_fuzzy_equal(expect_relabel_record, d.events[1][2]) assert_fuzzy_equal(expect_test_out_record, d.events[3][2]) end end def get(uri, header = {}) url = URI.parse(uri) req = Net::HTTP::Get.new(url, header) unless header.has_key?('Content-Type') header['Content-Type'] = 'application/octet-stream' end Net::HTTP.start(url.host, url.port) {|http| http.request(req) } end sub_test_case "servlets" do setup do @port = unused_port # check @type and type in one configuration conf = <<-EOC @type test_in @id test_in @type monitor_agent bind "127.0.0.1" port #{@port} tag monitor @id monitor_agent @type test_filter @id test_filter @type relabel @id test_relabel @label @test EOC begin @ra = Fluent::RootAgent.new(log: $log) stub(Fluent::Engine).root_agent { @ra } @ra = configure_ra(@ra, conf) # store Supervisor instance to avoid collected by GC FileUtils.mkdir_p(CONFIG_DIR) @filepath = File.expand_path('fluentd.conf', CONFIG_DIR) File.open(@filepath, 'w') do |v| v.puts(conf) end @supervisor = Fluent::Supervisor.new({config_path: @filepath}) @supervisor.configure ensure FileUtils.rm_r(CONFIG_DIR) rescue _ end end test "/api/plugins" do d = create_driver(" @type monitor_agent bind '127.0.0.1' port #{@port} tag monitor ") d.instance.start expected_test_in_response = "\ plugin_id:test_in\tplugin_category:input\ttype:test_in\toutput_plugin:false\tretry_count:\temit_records:0\temit_size:0" expected_test_filter_response = "\ plugin_id:test_filter\tplugin_category:filter\ttype:test_filter\toutput_plugin:false\tretry_count:\temit_records:0\temit_size:0" response = get("http://127.0.0.1:#{@port}/api/plugins").body test_in = response.split("\n")[0] test_filter = response.split("\n")[3] assert_equal(expected_test_in_response, test_in) assert_equal(expected_test_filter_response, test_filter) end data(:include_config_and_retry_yes => [true, true, "include_config yes", "include_retry yes"], :include_config_and_retry_no => [false, false, "include_config no", "include_retry no"],) test "/api/plugins.json" do |(with_config, with_retry, include_conf, retry_conf)| d = create_driver(" @type monitor_agent bind '127.0.0.1' port #{@port} tag monitor #{include_conf} #{retry_conf} ") d.instance.start expected_test_in_response = { "output_plugin" => false, "plugin_category" => "input", "plugin_id" => "test_in", "retry_count" => nil, "type" => "test_in", "emit_records" => 0, "emit_size" => 0, } expected_test_in_response.merge!("config" => {"@id" => "test_in", "@type" => "test_in"}) if with_config expected_null_response = { "buffer_queue_length" => 0, "buffer_timekeys" => [], "buffer_total_queued_size" => 0, "output_plugin" => true, "plugin_category" => "output", "plugin_id" => "null", "retry_count" => 0, "type" => "null", "buffer_available_buffer_space_ratios" => Float, "buffer_queue_byte_size" => Integer, "buffer_stage_byte_size" => Integer, "buffer_stage_length" => Integer, "emit_count" => Integer, "emit_records" => Integer, "emit_size" => Integer, "write_count" => Integer, "rollback_count" => Integer, "slow_flush_count" => Integer, "flush_time_count" => Integer, } expected_null_response.merge!("config" => {"@id" => "null", "@type" => "null"}) if with_config expected_null_response.merge!("retry" => {}) if with_retry response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json").body) test_in_response = response["plugins"][0] null_response = response["plugins"][5] assert_equal(expected_test_in_response, test_in_response) assert_fuzzy_equal(expected_null_response, null_response) end test "/api/plugins.json/not_found" do d = create_driver(" @type monitor_agent bind '127.0.0.1' port #{@port} tag monitor ") d.instance.start resp = get("http://127.0.0.1:#{@port}/api/plugins.json/not_found") assert_equal('404', resp.code) body = JSON.parse(resp.body) assert_equal(body['message'], 'Not found') end data(:with_config_and_retry_yes => [true, true, "?with_config=yes&with_retry"], :with_config_and_retry_no => [false, false, "?with_config=no&with_retry=no"]) test "/api/plugins.json with query parameter. query parameter is preferred than include_config" do |(with_config, with_retry, query_param)| d = create_driver(" @type monitor_agent bind '127.0.0.1' port #{@port} tag monitor ") d.instance.start expected_test_in_response = { "output_plugin" => false, "plugin_category" => "input", "plugin_id" => "test_in", "retry_count" => nil, "type" => "test_in", "emit_records" => 0, "emit_size" => 0, } expected_test_in_response.merge!("config" => {"@id" => "test_in", "@type" => "test_in"}) if with_config expected_null_response = { "buffer_queue_length" => 0, "buffer_timekeys" => [], "buffer_total_queued_size" => 0, "output_plugin" => true, "plugin_category" => "output", "plugin_id" => "null", "retry_count" => 0, "type" => "null", "buffer_available_buffer_space_ratios" => Float, "buffer_queue_byte_size" => Integer, "buffer_stage_byte_size" => Integer, "buffer_stage_length" => Integer, "emit_count" => Integer, "emit_records" => Integer, "emit_size" => Integer, "write_count" => Integer, "rollback_count" => Integer, "slow_flush_count" => Integer, "flush_time_count" => Integer, } expected_null_response.merge!("config" => {"@id" => "null", "@type" => "null"}) if with_config expected_null_response.merge!("retry" => {}) if with_retry response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json#{query_param}").body) test_in_response = response["plugins"][0] null_response = response["plugins"][5] assert_equal(expected_test_in_response, test_in_response) assert_fuzzy_include(expected_null_response, null_response) end test "/api/plugins.json with 'with_ivars'. response contains specified instance variables of each plugin" do d = create_driver(" @type monitor_agent bind '127.0.0.1' port #{@port} tag monitor ") d.instance.start expected_test_in_response = { "output_plugin" => false, "plugin_category" => "input", "plugin_id" => "test_in", "retry_count" => nil, "type" => "test_in", "instance_variables" => {"id" => "test_in"}, "emit_records" => 0, "emit_size" => 0, } expected_null_response = { "buffer_queue_length" => 0, "buffer_timekeys" => [], "buffer_total_queued_size" => 0, "output_plugin" => true, "plugin_category" => "output", "plugin_id" => "null", "retry_count" => 0, "type" => "null", "instance_variables" => {"id" => "null"}, "buffer_available_buffer_space_ratios" => Float, "buffer_queue_byte_size" => Integer, "buffer_stage_byte_size" => Integer, "buffer_stage_length" => Integer, "emit_count" => Integer, "emit_records" => Integer, "emit_size" => Integer, "write_count" => Integer, "rollback_count" => Integer, "slow_flush_count" => Integer, "flush_time_count" => Integer, } response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json?with_config=no&with_retry=no&with_ivars=id,num_errors").body) test_in_response = response["plugins"][0] null_response = response["plugins"][5] assert_equal(expected_test_in_response, test_in_response) assert_fuzzy_equal(expected_null_response, null_response) end test "/api/config" do d = create_driver(" @type monitor_agent bind '127.0.0.1' port #{@port} tag monitor ") d.instance.start expected_response_regex = %r{pid:\d+\tppid:\d+\tversion:#{Fluent::VERSION}\tconfig_path:#{@filepath}\tpid_file:\tplugin_dirs:/etc/fluent/plugin\tlog_path:} assert_match(expected_response_regex, get("http://127.0.0.1:#{@port}/api/config").body) end test "/api/config.json" do d = create_driver(" @type monitor_agent bind '127.0.0.1' port #{@port} tag monitor ") d.instance.start res = JSON.parse(get("http://127.0.0.1:#{@port}/api/config.json").body) assert_equal(@filepath, res["config_path"]) assert_nil(res["pid_file"]) assert_equal(["/etc/fluent/plugin"], res["plugin_dirs"]) assert_nil(res["log_path"]) assert_equal(Fluent::VERSION, res["version"]) end test "/api/config.json?debug=1" do d = create_driver(" @type monitor_agent bind '127.0.0.1' port #{@port} tag monitor ") d.instance.start # To check pretty print assert_true !get("http://127.0.0.1:#{@port}/api/config.json").body.include?("\n") assert_true get("http://127.0.0.1:#{@port}/api/config.json?debug=1").body.include?("\n") end test "/api/config.json/not_found" do d = create_driver(" @type monitor_agent bind '127.0.0.1' port #{@port} tag monitor ") d.instance.start resp = get("http://127.0.0.1:#{@port}/api/config.json/not_found") assert_equal('404', resp.code) body = JSON.parse(resp.body) assert_equal(body['message'], 'Not found') end end sub_test_case "check retry of buffered plugins" do class FluentTestFailWriteOutput < ::Fluent::Plugin::Output ::Fluent::Plugin.register_output('test_out_fail_write', self) def write(chunk) raise "chunk error!" end end setup do @port = unused_port # check @type and type in one configuration conf = <<-EOC @type monitor_agent @id monitor_agent bind "127.0.0.1" port #{@port} @type test_out_fail_write @id test_out_fail_write timekey 1m flush_mode immediate EOC @ra = Fluent::RootAgent.new(log: $log) stub(Fluent::Engine).root_agent { @ra } @ra = configure_ra(@ra, conf) end test "/api/plugins.json retry object should be filled if flush was failed" do d = create_driver(" @type monitor_agent bind '127.0.0.1' port #{@port} include_config no ") d.instance.start output = @ra.outputs[0] output.start output.after_start expected_test_out_fail_write_response = { "buffer_queue_length" => 1, "buffer_timekeys" => [output.calculate_timekey(event_time)], "buffer_total_queued_size" => 40, "output_plugin" => true, "plugin_category" => "output", "plugin_id" => "test_out_fail_write", "type" => "test_out_fail_write", "buffer_newest_timekey" => output.calculate_timekey(event_time), "buffer_oldest_timekey" => output.calculate_timekey(event_time), "buffer_available_buffer_space_ratios" => Float, "buffer_queue_byte_size" => Integer, "buffer_stage_byte_size" => Integer, "buffer_stage_length" => Integer, "emit_count" => Integer, "emit_records" => Integer, "emit_size" => Integer, "write_count" => Integer, "rollback_count" => Integer, 'slow_flush_count' => Integer, 'flush_time_count' => Integer, } output.emit_events('test.tag', Fluent::ArrayEventStream.new([[event_time, {"message" => "test failed flush 1"}]])) # flush few times to check steps 2.times do output.force_flush # output.force_flush calls #submit_flush_all, but #submit_flush_all skips to call #submit_flush_once when @retry exists. # So that forced flush in retry state should be done by calling #submit_flush_once directly. output.submit_flush_once sleep 0.1 until output.buffer.queued? end response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json").body) test_out_fail_write_response = response["plugins"][1] # remove dynamic keys response_retry_count = test_out_fail_write_response.delete("retry_count") response_retry = test_out_fail_write_response.delete("retry") assert_fuzzy_equal(expected_test_out_fail_write_response, test_out_fail_write_response) assert{ response_retry.has_key?("steps") } # it's very hard to check exact retry count (because retries are called by output flush thread scheduling) assert{ response_retry_count >= 1 && response_retry["steps"] >= 0 } assert{ response_retry_count == response_retry["steps"] + 1 } end end sub_test_case "check the port number of http server" do test "on single worker environment" do port = unused_port d = create_driver(" @type monitor_agent bind '127.0.0.1' port #{port} ") d.instance.start assert_equal("200", get("http://127.0.0.1:#{port}/api/plugins").code) end test "worker_id = 2 on multi worker environment" do port = unused_port Fluent::SystemConfig.overwrite_system_config('workers' => 4) do d = Fluent::Test::Driver::Input.new(Fluent::Plugin::MonitorAgentInput) d.instance.instance_eval{ @_fluentd_worker_id = 2 } d.configure(" @type monitor_agent bind '127.0.0.1' port #{port - 2} ") d.instance.start end assert_equal("200", get("http://127.0.0.1:#{port}/api/plugins").code) end end sub_test_case "check NoMethodError does not happen" do class FluentTestBufferVariableOutput < ::Fluent::Plugin::Output ::Fluent::Plugin.register_output('test_out_buffer_variable', self) def configure(conf) super @buffer = [] end def write(chunk) end end class FluentTestBufferVariableFilter < ::Fluent::Plugin::Filter ::Fluent::Plugin.register_filter("test_filter_buffer_variable", self) def initialize super @buffer = {} end def filter(tag, time, record) record end end setup do conf = <<-EOC @type test_out_buffer_variable @id test_out_buffer_variable @type test_filter_buffer_variable @id test_filter_buffer_variable EOC @ra = Fluent::RootAgent.new(log: $log) stub(Fluent::Engine).root_agent { @ra } @ra = configure_ra(@ra, conf) end test "plugins have a variable named buffer does not throws NoMethodError" do port = unused_port d = create_driver(" @type monitor_agent bind '127.0.0.1' port #{port} include_config no ") d.instance.start assert_equal("200", get("http://127.0.0.1:#{port}/api/plugins.json").code) assert{ d.logs.none?{|log| log.include?("NoMethodError") } } assert_equal(false, d.instance.instance_variable_get(:@first_warn)) end end end