require_relative "../helper" require 'fluent/test/driver/output' require 'fluent/plugin/out_http' require 'webrick' require 'webrick/https' require 'net/http' require 'uri' require 'json' # WEBrick's ProcHandler doesn't handle PUT by default module WEBrick::HTTPServlet class ProcHandler < AbstractServlet alias do_PUT do_GET end end class HTTPOutputTest < Test::Unit::TestCase include Fluent::Test::Helpers TMP_DIR = File.join(__dir__, "../tmp/out_http#{ENV['TEST_ENV_NUMBER']}") DEFAULT_LOGGER = ::WEBrick::Log.new(::STDOUT, ::WEBrick::BasicLog::FATAL) class << self # Use class variable to reduce server start/shutdown time def startup @@result = nil @@auth_handler = nil @@http_server_thread = nil end def shutdown @@http_server_thread.kill @@http_server_thread.join rescue end end def server_port 19880 end def base_endpoint "http://127.0.0.1:#{server_port}" end def server_config config = {BindAddress: '127.0.0.1', Port: server_port} # Suppress webrick logs config[:Logger] = DEFAULT_LOGGER config[:AccessLog] = [] config end def http_client(**opts, &block) opts = opts.merge(open_timeout: 1, read_timeout: 1) if block_given? Net::HTTP.start('127.0.0.1', server_port, **opts, &block) else Net::HTTP.start('127.0.0.1', server_port, **opts) end end def run_http_server server = ::WEBrick::HTTPServer.new(server_config) server.mount_proc('/test') { |req, res| if @@auth_handler @@auth_handler.call(req, res) end @@result.method = req.request_method @@result.content_type = req.content_type req.each do |key, value| @@result.headers[key] = value end data = [] case req.content_type when 'application/x-ndjson' req.body.each_line { |l| data << JSON.parse(l) } when 'application/json' data = JSON.parse(req.body) when 'text/plain' # Use single_value in this test req.body.each_line { |line| data << line.chomp } else data << req.body end @@result.data = data res.status = 200 res.body = "success" } server.mount_proc('/503') { |_, res| res.status = 503 res.body = 'Service Unavailable' } server.mount_proc('/404') { |_, res| res.status = 404 res.body = 'Not Found' } # For start check server.mount_proc('/') { |_, res| res.status = 200 res.body = 'Hello Fluentd!' } server.start ensure server.shutdown rescue nil end Result = Struct.new("Result", :method, :content_type, :headers, :data) setup do Fluent::Test.setup FileUtils.rm_rf(TMP_DIR) @@result = Result.new(nil, nil, {}, nil) @@http_server_thread ||= Thread.new do run_http_server end now = Time.now started = false until started raise "Server not started" if (Time.now - now > 10.0) begin http_client { |c| c.request_get('/') } started = true rescue sleep 0.5 end end end teardown do @@result = nil @@auth_handler = nil end def create_driver(conf) Fluent::Test::Driver::Output.new(Fluent::Plugin::HTTPOutput).configure(conf) end def config %[ endpoint #{base_endpoint}/test ] end def test_events [ {"message" => "hello", "num" => 10, "bool" => true}, {"message" => "hello", "num" => 11, "bool" => false} ] end def test_configure d = create_driver(config) assert_equal "http://127.0.0.1:#{server_port}/test", d.instance.endpoint assert_equal :post, d.instance.http_method assert_equal 'application/x-ndjson', d.instance.content_type assert_equal [503], d.instance.retryable_response_codes assert_true d.instance.error_response_as_unrecoverable assert_nil d.instance.proxy assert_nil d.instance.headers end def test_configure_with_warn d = create_driver(config) assert_match(/Status code 503 is going to be removed/, d.instance.log.out.logs.join) end def test_configure_without_warn d = create_driver(<<~CONFIG) endpoint #{base_endpoint}/test retryable_response_codes [503] CONFIG assert_not_match(/Status code 503 is going to be removed/, d.instance.log.out.logs.join) end # Check if an exception is raised on not JSON format use data('not_json' => 'msgpack') def test_configure_with_json_array_err(format_type) assert_raise(Fluent::ConfigError) do create_driver(config + %[ json_array true @type #{format_type} ]) end end data('json' => ['json', 'application/x-ndjson'], 'ltsv' => ['ltsv', 'text/tab-separated-values'], 'msgpack' => ['msgpack', 'application/x-msgpack'], 'single_value' => ['single_value', 'text/plain']) def test_configure_content_type(types) format_type, content_type = types d = create_driver(config + %[ @type #{format_type} ]) assert_equal content_type, d.instance.content_type end # Check that json_array setting sets content_type = application/json data('json' => 'application/json') def test_configure_content_type_json_array(content_type) d = create_driver(config + "json_array true") assert_equal content_type, d.instance.content_type end data('PUT' => 'put', 'POST' => 'post') def test_write_with_method(method) d = create_driver(config + "http_method #{method}") d.run(default_tag: 'test.http') do test_events.each { |event| d.feed(event) } end result = @@result assert_equal method.upcase, result.method assert_equal 'application/x-ndjson', result.content_type assert_equal test_events, result.data assert_not_empty result.headers end # Check that JSON at HTTP request body is valid def test_write_with_json_array_setting d = create_driver(config + "json_array true") d.run(default_tag: 'test.http') do test_events.each { |event| d.feed(event) } end result = @@result assert_equal 'application/json', result.content_type assert_equal test_events, result.data assert_not_empty result.headers end def test_write_with_single_value_format d = create_driver(config + %[ @type single_value ]) d.run(default_tag: 'test.http') do test_events.each { |event| d.feed(event) } end result = @@result assert_equal 'text/plain', result.content_type assert_equal (test_events.map { |e| e['message'] }), result.data assert_not_empty result.headers end def test_write_with_headers d = create_driver(config + 'headers {"test_header":"fluentd!"}') d.run(default_tag: 'test.http') do test_events.each { |event| d.feed(event) } end result = @@result assert_true result.headers.has_key?('test_header') assert_equal "fluentd!", result.headers['test_header'] end def test_write_with_headers_from_placeholders d = create_driver(config + %[ headers_from_placeholders {"x-test":"${$.foo.bar}-test","x-tag":"${tag}"} ]) d.run(default_tag: 'test.http') do test_events.each { |event| ev = event.dup ev['foo'] = {'bar' => 'abcd'} d.feed(ev) } end result = @@result assert_equal "abcd-test", result.headers['x-test'] assert_equal "test.http", result.headers['x-tag'] end def test_write_with_retryable_response old_report_on_exception = Thread.report_on_exception Thread.report_on_exception = false # thread finished as invalid state since RetryableResponse raises. d = create_driver("endpoint #{base_endpoint}/503") assert_raise(Fluent::Plugin::HTTPOutput::RetryableResponse) do d.run(default_tag: 'test.http', shutdown: false) do test_events.each { |event| d.feed(event) } end end d.instance_shutdown(log: $log) ensure Thread.report_on_exception = old_report_on_exception end def test_write_with_disabled_unrecoverable d = create_driver(%[ endpoint #{base_endpoint}/404 error_response_as_unrecoverable false ]) d.run(default_tag: 'test.http', shutdown: false) do test_events.each { |event| d.feed(event) } end assert_match(/got error response from.*404 Not Found Not Found/, d.instance.log.out.logs.join) d.instance_shutdown end sub_test_case 'basic auth' do setup do FileUtils.mkdir_p(TMP_DIR) htpd = WEBrick::HTTPAuth::Htpasswd.new(File.join(TMP_DIR, 'dot.htpasswd')) htpd.set_passwd(nil, 'test', 'hey') authenticator = WEBrick::HTTPAuth::BasicAuth.new(:UserDB => htpd, :Realm => 'test', :Logger => DEFAULT_LOGGER) @@auth_handler = Proc.new { |req, res| authenticator.authenticate(req, res) } end teardown do FileUtils.rm_rf(TMP_DIR) end def server_port 19881 end def test_basic_auth d = create_driver(config + %[ method basic username test password hey ]) d.run(default_tag: 'test.http') do test_events.each { |event| d.feed(event) } end result = @@result assert_equal 'POST', result.method assert_equal 'application/x-ndjson', result.content_type assert_equal test_events, result.data assert_not_empty result.headers end # This test includes `error_response_as_unrecoverable true` behaviour check def test_basic_auth_with_invalid_auth d = create_driver(config + %[ method basic username ayaya password hello? ]) d.run(default_tag: 'test.http', shutdown: false) do test_events.each { |event| d.feed(event) } end assert_match(/got unrecoverable error/, d.instance.log.out.logs.join) d.instance_shutdown end end sub_test_case 'HTTPS' do def server_port 19882 end def server_config config = super # WEBrick supports self-generated self-signed certificate config[:SSLEnable] = true config[:SSLCertName] = [["CN", WEBrick::Utils::getservername]] config end def http_client(&block) super(use_ssl: true, verify_mode: OpenSSL::SSL::VERIFY_NONE, &block) end def test_write_with_https d = create_driver(%[ endpoint https://127.0.0.1:#{server_port}/test tls_verify_mode none ssl_timeout 2s ]) d.run(default_tag: 'test.http') do test_events.each { |event| d.feed(event) } end result = @@result assert_equal 'POST', result.method assert_equal 'application/x-ndjson', result.content_type assert_equal test_events, result.data assert_not_empty result.headers end end end