test/plugin/test_out_http.rb in fluent-plugin-out-http-1.3.0 vs test/plugin/test_out_http.rb in fluent-plugin-out-http-1.3.1

- old
+ new

@@ -1,771 +1,872 @@ -# -*- coding: utf-8 -*- -require 'net/http' -require 'uri' -require 'yajl' -require 'fluent/test/http_output_test' -require 'fluent/plugin/out_http' -require 'fluent/test/driver/output' -require 'fluent/test/helpers' -require_relative "./script/plugin/formatter_test" - -module OS - # ref. http://stackoverflow.com/questions/170956/how-can-i-find-which-operating-system-my-ruby-program-is-running-on - def OS.windows? - (/cygwin|mswin|mingw|bccwin|wince|emx/ =~ RUBY_PLATFORM) != nil - end - - def OS.mac? - (/darwin/ =~ RUBY_PLATFORM) != nil - end - - def OS.unix? - !OS.windows? - end - - def OS.linux? - OS.unix? and not OS.mac? - end -end - -class HTTPOutputTestBase < Test::Unit::TestCase - include Fluent::Test::Helpers - - def self.port - 5126 - end - - def self.server_config - config = {BindAddress: '127.0.0.1', Port: port} - if ENV['VERBOSE'] - logger = WEBrick::Log.new(STDOUT, WEBrick::BasicLog::DEBUG) - config[:Logger] = logger - config[:AccessLog] = [] - end - config - end - - def self.test_http_client(**opts) - opts = opts.merge(open_timeout: 1, read_timeout: 1) - Net::HTTP.start('127.0.0.1', port, **opts) - end - - # setup / teardown for servers - def setup - Fluent::Test.setup - @posts = [] - @puts = [] - @prohibited = 0 - @requests = 0 - @auth = false - @headers = {} - @dummy_server_thread = Thread.new do - srv = WEBrick::HTTPServer.new(self.class.server_config) - begin - allowed_methods = %w(POST PUT) - srv.mount_proc('/api') { |req,res| - @requests += 1 - unless allowed_methods.include? req.request_method - res.status = 405 - res.body = 'request method mismatch' - next - end - req.each do |key, value| - @headers[key] = value - end - if @auth and req.header['authorization'][0] == 'Basic YWxpY2U6c2VjcmV0IQ==' # pattern of user='alice' passwd='secret!' - # ok, authorized - # pattern of bear #{Base64.encode64('secret token!')} - elsif @auth and req.header['authorization'][0] == 'bearer c2VjcmV0IHRva2VuIQ==' - # pattern of jwt - # header: { - # "alg": "HS256", - # "typ": "JWT" - # } - # payload: { - # "iss": "Hoge Publisher", - # "sub": "Hoge User" - # } - # signature: - # HS256(base64UrlEncode(header) + "." + - # base64UrlEncode(payload) + "." + - # secret) - elsif @auth and req.header['authorization'][0] == 'jwt eyJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJIb2dlIFB1Ymxpc2hlciIsInN1YiI6IkhvZ2UgVXNlciJ9.V2NL7YgCWNt5d3vTXFrcRLpRImO2cU2JZ4mQglqw3rE' - elsif @auth - res.status = 403 - @prohibited += 1 - next - else - # ok, authorization not required - end - - record = {:auth => nil} - if req.content_type == 'application/json' - record[:json] = Yajl.load(req.body) - elsif req.content_type == 'text/plain' - puts req - record[:data] = req.body - elsif req.content_type == 'application/octet-stream' - record[:data] = req.body - elsif req.content_type == 'application/x-ndjson' - data = [] - req.body.each_line { |l| - data << Yajl.load(l) - } - record[:x_ndjson] = data - else - record[:form] = Hash[*(req.body.split('&').map{|kv|kv.split('=')}.flatten)] - end - - instance_variable_get("@#{req.request_method.downcase}s").push(record) - - res.status = 200 - } - srv.mount_proc('/modified-api') { |req,res| - res.status = 303 - res.body = 'See other' - } - srv.mount_proc('/') { |req,res| - res.status = 200 - res.body = 'running' - } - srv.start - ensure - srv.shutdown - end - end - - # to wait completion of dummy server.start() - require 'thread' - cv = ConditionVariable.new - watcher = Thread.new { - connected = false - while not connected - begin - client = self.class.test_http_client - client.request_get('/') - connected = true - rescue Errno::ECONNREFUSED - sleep 0.1 - rescue StandardError => e - p e - sleep 0.1 - end - end - cv.signal - } - mutex = Mutex.new - mutex.synchronize { - cv.wait(mutex) - } - end - - def test_dummy_server - client = self.class.test_http_client - post_header = { 'Content-Type' => 'application/x-www-form-urlencoded' } - - assert_equal '200', client.request_get('/').code - assert_equal '200', client.request_post('/api/service/metrics/hoge', 'number=1&mode=gauge', post_header).code - - assert_equal 1, @posts.size - - assert_equal '1', @posts[0][:form]['number'] - assert_equal 'gauge', @posts[0][:form]['mode'] - assert_nil @posts[0][:auth] - - assert_equal '303', client.request_get('/modified-api').code - - @auth = true - - assert_equal '403', client.request_post('/api/service/metrics/pos', 'number=30&mode=gauge', post_header).code - - req_with_auth = lambda do |number, mode, user, pass| - req = Net::HTTP::Post.new("/api/service/metrics/pos") - req.content_type = 'application/x-www-form-urlencoded' - req.basic_auth user, pass - req.set_form_data({'number'=>number, 'mode'=>mode}) - req - end - - assert_equal '403', client.request(req_with_auth.call(500, 'count', 'alice', 'wrong password!')).code - - assert_equal '403', client.request(req_with_auth.call(500, 'count', 'alice', 'wrong password!')).code - - assert_equal 1, @posts.size - - assert_equal '200', client.request(req_with_auth.call(500, 'count', 'alice', 'secret!')).code - - assert_equal 2, @posts.size - - end - - def teardown - @dummy_server_thread.kill - @dummy_server_thread.join - end - - def create_driver(conf) - Fluent::Test::Driver::Output.new(Fluent::Plugin::HTTPOutput).configure(conf) - end -end - -class HTTPOutputTest < HTTPOutputTestBase - CONFIG = %[ - endpoint_url http://127.0.0.1:#{port}/api/ - ] - - CONFIG_QUERY_PARAM = %[ - endpoint_url http://127.0.0.1:#{port}/api?foo=bar&baz=qux - ] - - CONFIG_JSON = %[ - endpoint_url http://127.0.0.1:#{port}/api/ - serializer json - ] - - CONFIG_TEXT = %[ - endpoint_url http://127.0.0.1:#{port}/api/ - serializer text - ] - - CONFIG_RAW = %[ - endpoint_url http://127.0.0.1:#{port}/api/ - serializer raw - ] - - CONFIG_PUT = %[ - endpoint_url http://127.0.0.1:#{port}/api/ - http_method put - ] - - CONFIG_HTTP_ERROR = %[ - endpoint_url https://127.0.0.1:#{port - 1}/api/ - ] - - CONFIG_HTTP_ERROR_SUPPRESSED = %[ - endpoint_url https://127.0.0.1:#{port - 1}/api/ - raise_on_error false - ] - - RATE_LIMIT_MSEC = 1200 - - CONFIG_RATE_LIMIT = %[ - endpoint_url http://127.0.0.1:#{port}/api/ - rate_limit_msec #{RATE_LIMIT_MSEC} - ] - - def test_configure - d = create_driver CONFIG - assert_equal "http://127.0.0.1:#{self.class.port}/api/", d.instance.endpoint_url - assert_equal :form, d.instance.serializer - assert_equal [503], d.instance.recoverable_status_codes - - d = create_driver CONFIG_JSON - assert_equal "http://127.0.0.1:#{self.class.port}/api/", d.instance.endpoint_url - assert_equal :json, d.instance.serializer - end - - test 'lack of tag in chunk_keys' do - assert_raise_message(/'tag' in chunk_keys is required./) do - create_driver(Fluent::Config::Element.new( - 'ROOT', '', { - '@type' => 'http', - 'endpoint_url' => "http://127.0.0.1:#{self.class.port}/api/", - 'buffered' => true, - }, [ - Fluent::Config::Element.new('buffer', 'mykey', { - 'chunk_keys' => 'mykey' - }, []) - ] - )) - end - end - - def test_emit_form - d = create_driver CONFIG - d.run(default_tag: 'test.metrics') do - d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1, 'binary' => "\xe3\x81\x82".force_encoding("ascii-8bit") }) - end - - assert_equal 1, @posts.size - record = @posts[0] - - assert_equal '50', record[:form]['field1'] - assert_equal '20', record[:form]['field2'] - assert_equal '10', record[:form]['field3'] - assert_equal '1', record[:form]['otherfield'] - assert_equal URI.encode_www_form_component("あ").upcase, record[:form]['binary'].upcase - assert_nil record[:auth] - - d.run(default_tag: 'test.metrics') do - d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1 }) - end - - assert_equal 2, @posts.size - end - - def test_emit_form_with_query_params - d = create_driver CONFIG_QUERY_PARAM - d.run(default_tag: 'test.metrics') do - d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1, 'binary' => "\xe3\x81\x82".force_encoding("ascii-8bit") }) - end - - assert_equal 1, @posts.size - record = @posts[0] - - assert_equal '50', record[:form]['field1'] - assert_equal '20', record[:form]['field2'] - assert_equal '10', record[:form]['field3'] - assert_equal '1', record[:form]['otherfield'] - assert_equal URI.encode_www_form_component("あ").upcase, record[:form]['binary'].upcase - assert_nil record[:auth] - - d.run(default_tag: 'test.metrics') do - d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1 }) - end - - assert_equal 2, @posts.size - end - - def test_emit_form_with_custom_headers - d = create_driver CONFIG + %[custom_headers {"key":"custom","token":"arbitrary"}] - d.run(default_tag: 'test.metrics') do - d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1, 'binary' => "\xe3\x81\x82".force_encoding("ascii-8bit") }) - end - - assert_true @headers.has_key?("key") - assert_equal "custom", @headers["key"] - assert_true @headers.has_key?("token") - assert_equal "arbitrary", @headers["token"] - - assert_equal 1, @posts.size - record = @posts[0] - - assert_equal '50', record[:form]['field1'] - assert_equal '20', record[:form]['field2'] - assert_equal '10', record[:form]['field3'] - assert_equal '1', record[:form]['otherfield'] - assert_equal URI.encode_www_form_component("あ").upcase, record[:form]['binary'].upcase - assert_nil record[:auth] - - d.run(default_tag: 'test.metrics') do - d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1 }) - end - - assert_equal 2, @posts.size - end - - class BufferedEmitTest < self - def test_emit_form - d = create_driver CONFIG + %[buffered true] - d.run(default_tag: 'test.metrics', shutdown: false) do - d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1, 'binary' => "\xe3\x81\x82".force_encoding("ascii-8bit") }) - end - - assert_equal 1, @posts.size - record = @posts[0] - - assert_equal '50', record[:form]['field1'] - assert_equal '20', record[:form]['field2'] - assert_equal '10', record[:form]['field3'] - assert_equal '1', record[:form]['otherfield'] - assert_equal URI.encode_www_form_component("あ").upcase, record[:form]['binary'].upcase - assert_nil record[:auth] - - d.run(default_tag: 'test.metrics', shutdown: false) do - d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1 }) - end - - assert_equal 2, @posts.size - end - - def test_emit_form_with_placeholders - d = create_driver(Fluent::Config::Element.new( - 'ROOT', '' , - {"endpoint_url" => "${endpoint}", - "buffered" => true}, - [Fluent::Config::Element.new('buffer', 'tag, endpoint', {"@type" => "memory"} ,[])])) - - d.run(default_tag: 'test.metrics', shutdown: false) do - d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1, 'binary' => "\xe3\x81\x82".force_encoding("ascii-8bit"), 'endpoint' => "http://127.0.0.1:#{self.class.port}/modified-api/" }) - end - - assert_equal 0, @posts.size # post into other URI - assert_equal "http://127.0.0.1:#{self.class.port}/modified-api/", d.instance.endpoint_url - end - - def test_emit_form_put - d = create_driver CONFIG_PUT + %[buffered true] - d.run(default_tag: 'test.metrics', shutdown: false) do - d.feed({ 'field1' => 50 }) - end - - assert_equal 0, @posts.size - assert_equal 1, @puts.size - record = @puts[0] - - assert_equal '50', record[:form]['field1'] - assert_nil record[:auth] - - d.run(default_tag: 'test.metrics', shutdown: false) do - d.feed({ 'field1' => 50 }) - end - - assert_equal 0, @posts.size - assert_equal 2, @puts.size - end - - def test_emit_json - binary_string = "\xe3\x81\x82" - d = create_driver CONFIG_JSON + %[buffered true] - d.run(default_tag: 'test.metrics') do - d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1, 'binary' => binary_string }) - end - - assert_equal 1, @posts.size - record = @posts[0] - - assert_equal 50, record[:json]['field1'] - assert_equal 20, record[:json]['field2'] - assert_equal 10, record[:json]['field3'] - assert_equal 1, record[:json]['otherfield'] - assert_equal binary_string, record[:json]['binary'] - assert_nil record[:auth] - end - - def test_emit_x_ndjson - binary_string = "\xe3\x81\x82" - d = create_driver CONFIG_JSON + %[buffered true\nbulk_request] - d.run(default_tag: 'test.metrics') do - d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1, 'binary' => binary_string }) - d.feed({ 'field1' => 70, 'field2' => 30, 'field3' => 20, 'otherfield' => 2, 'binary' => binary_string }) - end - - assert_equal 1, @posts.size - record = @posts[0] - - expected =[ - { - "binary" => "\u3042", - "field1" => 50, - "field2" => 20, - "field3" => 10, - "otherfield" => 1 - }, - { - "binary" => "\u3042", - "field1" => 70, - "field2" => 30, - "field3" => 20, - "otherfield" => 2 - } - ] - - assert_equal expected, record[:x_ndjson] - assert_nil record[:auth] - end - end - - def test_emit_form_put - d = create_driver CONFIG_PUT - d.run(default_tag: 'test.metrics') do - d.feed({ 'field1' => 50 }) - end - - assert_equal 0, @posts.size - assert_equal 1, @puts.size - record = @puts[0] - - assert_equal '50', record[:form]['field1'] - assert_nil record[:auth] - - d.run(default_tag: 'test.metrics') do - d.feed({ 'field1' => 50 }) - end - - assert_equal 0, @posts.size - assert_equal 2, @puts.size - end - - def test_emit_json - binary_string = "\xe3\x81\x82" - d = create_driver CONFIG_JSON - d.run(default_tag: 'test.metrics') do - d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1, 'binary' => binary_string }) - end - - assert_equal 1, @posts.size - record = @posts[0] - - assert_equal 50, record[:json]['field1'] - assert_equal 20, record[:json]['field2'] - assert_equal 10, record[:json]['field3'] - assert_equal 1, record[:json]['otherfield'] - assert_equal binary_string, record[:json]['binary'] - assert_nil record[:auth] - end - - def test_emit_text - binary_string = "\xe3\x81\x82" - d = create_driver CONFIG_TEXT - d.run(default_tag: 'test.metrics') do - d.feed({ "message" => "hello" }) - end - assert_equal 1, @posts.size - record = @posts[0] - assert_equal 'hello', record[:data] - assert_nil record[:auth] - end - - def test_emit_raw - binary_string = "\xe3\x81\x82" - d = create_driver CONFIG_RAW + %[format msgpack] - d.run(default_tag: 'test.metrics') do - d.feed({ "message" => "hello" }) - end - assert_equal 1, @posts.size - record = @posts[0] - assert_equal ({ "message" => "hello" }).to_msgpack, record[:data] - assert_nil record[:auth] - end - - def test_http_error_is_raised - d = create_driver CONFIG_HTTP_ERROR - assert_raise Errno::ECONNREFUSED do - d.run(default_tag: 'test.metrics') do - d.feed({ 'field1' => 50 }) - end - end - end - - def test_http_error_is_suppressed_with_raise_on_error_false - d = create_driver CONFIG_HTTP_ERROR_SUPPRESSED - d.run(default_tag: 'test.metrics') do - d.feed({ 'field1' => 50 }) - end - # drive asserts the next output chain is called; - # so no exception means our plugin handled the error - - assert_equal 0, @requests - end - - def test_rate_limiting - d = create_driver CONFIG_RATE_LIMIT - record = { :k => 1 } - - last_emit = _current_msec - d.run(default_tag: 'test.metrics') do - d.feed(record) - end - - assert_equal 1, @posts.size - - d.run(default_tag: 'test.metrics') do - d.feed({}) - end - assert last_emit + RATE_LIMIT_MSEC > _current_msec, "Still under rate limiting interval" - assert_equal 1, @posts.size - - wait_msec = 500 - sleep (last_emit + RATE_LIMIT_MSEC - _current_msec + wait_msec) * 0.001 - - assert last_emit + RATE_LIMIT_MSEC < _current_msec, "No longer under rate limiting interval" - d.run(default_tag: 'test.metrics') do - d.feed(record) - end - assert_equal 2, @posts.size - end - - def _current_msec - Time.now.to_f * 1000 - end - - def test_auth - @auth = true # enable authentication of dummy server - - d = create_driver(CONFIG) - d.run(default_tag: 'test.metrics') do - d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1 }) - end # failed in background, and output warn log - - assert_equal 0, @posts.size - assert_equal 1, @prohibited - - d = create_driver(CONFIG + %[ - authentication basic - username alice - password wrong_password - ]) - d.run(default_tag: 'test.metrics') do - d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1 }) - end # failed in background, and output warn log - - assert_equal 0, @posts.size - assert_equal 2, @prohibited - - d = create_driver(CONFIG + %[ - authentication basic - username alice - password secret! - ]) - d.run(default_tag: 'test.metrics') do - d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1 }) - end # failed in background, and output warn log - - assert_equal 1, @posts.size - assert_equal 2, @prohibited - - require 'base64' - d = create_driver(CONFIG + %[ - authentication bearer - token #{Base64.encode64('secret token!')} - ]) - d.run(default_tag: 'test.metrics') do - d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1 }) - end # failed in background, and output warn log - - assert_equal 2, @posts.size - assert_equal 2, @prohibited - - d = create_driver(CONFIG + %[ - authentication jwt - token eyJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJIb2dlIFB1Ymxpc2hlciIsInN1YiI6IkhvZ2UgVXNlciJ9.V2NL7YgCWNt5d3vTXFrcRLpRImO2cU2JZ4mQglqw3rE - ]) - d.run(default_tag: 'test.metrics') do - d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1 }) - end # failed in background, and output warn log - - assert_equal 3, @posts.size - assert_equal 2, @prohibited - end - - class CustomFormatterTest < self - def test_new_config - config = Fluent::Config::Element.new( - 'ROOT', '', - {"@type" => "http", - "endpoint_url" => "http://127.0.0.1:#{self.class.port}/api/", - "serializer" => "json"}, [ - Fluent::Config::Element.new('format', '', { - "@type" => "test" - }, []) - ]) - d = create_driver config - payload = {"field" => 1} - d.run(default_tag: 'test.metrics') do - d.feed(payload) - end - - record = @posts[0] - expected = {"wrapped" => true, "record" => payload} - assert_equal expected, record[:json] - end - - def test_legacy_config - config = %[ - endpoint_url http://127.0.0.1:#{self.class.port}/api/ - serializer json - format test - ] - - d = create_driver config - payload = {"field" => 1} - d.run(default_tag: 'test.metrics') do - d.feed(payload) - end - - record = @posts[0] - expected = {"wrapped" => true, "record" => payload} - assert_equal expected, record[:json] - end - end -end - -class HTTPSOutputTest < HTTPOutputTestBase - def self.port - 5127 - end - - def self.server_config - config = super - config[:SSLEnable] = true - config[:SSLCertName] = [["CN", WEBrick::Utils::getservername]] - config - end - - def self.test_http_client - super( - use_ssl: true, - verify_mode: OpenSSL::SSL::VERIFY_NONE, - ) - end - - def test_configure - test_uri = URI.parse("https://127.0.0.1/") - - ssl_config = %[ - endpoint_url https://127.0.0.1:#{self.class.port}/api/ - ] - d = create_driver ssl_config - expected_endpoint_url = "https://127.0.0.1:#{self.class.port}/api/" - assert_equal expected_endpoint_url, d.instance.endpoint_url - http_opts = d.instance.http_opts(test_uri) - assert_equal true, http_opts[:use_ssl] - assert_equal OpenSSL::SSL::VERIFY_PEER, http_opts[:verify_mode] - - no_verify_config = %[ - endpoint_url https://127.0.0.1:#{self.class.port}/api/ - ssl_no_verify true - ] - d = create_driver no_verify_config - http_opts = d.instance.http_opts(test_uri) - assert_equal true, http_opts[:use_ssl] - assert_equal OpenSSL::SSL::VERIFY_NONE, http_opts[:verify_mode] - - cacert_file_config = %[ - endpoint_url https://127.0.0.1:#{self.class.port}/api/ - ssl_no_verify true - cacert_file /tmp/ssl.cert - ] - d = create_driver cacert_file_config - FileUtils::touch '/tmp/ssl.cert' - http_opts = d.instance.http_opts(test_uri) - assert_equal true, http_opts[:use_ssl] - assert_equal OpenSSL::SSL::VERIFY_NONE, http_opts[:verify_mode] - assert_equal true, File.file?('/tmp/ssl.cert') - puts http_opts - assert_equal File.join('/tmp/ssl.cert'), http_opts[:ca_file] - end - - def test_emit_form_ssl - config = %[ - endpoint_url https://127.0.0.1:#{self.class.port}/api/ - ssl_no_verify true - ] - d = create_driver config - d.run(default_tag: 'test.metrics') do - d.feed({ 'field1' => 50 }) - end - - assert_equal 1, @posts.size - record = @posts[0] - - assert_equal '50', record[:form]['field1'] - end - - def test_emit_form_ssl_ca - config = %[ - endpoint_url https://127.0.0.1:#{self.class.port}/api/ - ssl_no_verify true - cacert_file /tmp/ssl.cert - ] - d = create_driver config - d.run(default_tag: 'test.metrics') do - d.feed({ 'field1' => 50 }) - end - - assert_equal 1, @posts.size - record = @posts[0] - - assert_equal '50', record[:form]['field1'] - end -end +# -*- coding: utf-8 -*- +require 'net/http' +require 'uri' +require 'yajl' +require 'fluent/test/http_output_test' +require 'fluent/plugin/out_http' +require 'fluent/test/driver/output' +require 'fluent/test/helpers' +require_relative "./script/plugin/formatter_test" + +module OS + # ref. http://stackoverflow.com/questions/170956/how-can-i-find-which-operating-system-my-ruby-program-is-running-on + def OS.windows? + (/cygwin|mswin|mingw|bccwin|wince|emx/ =~ RUBY_PLATFORM) != nil + end + + def OS.mac? + (/darwin/ =~ RUBY_PLATFORM) != nil + end + + def OS.unix? + !OS.windows? + end + + def OS.linux? + OS.unix? and not OS.mac? + end +end + +class HTTPOutputTestBase < Test::Unit::TestCase + include Fluent::Test::Helpers + + def self.port + 5126 + end + + def self.server_config + config = {BindAddress: '127.0.0.1', Port: port} + if ENV['VERBOSE'] + logger = WEBrick::Log.new(STDOUT, WEBrick::BasicLog::DEBUG) + config[:Logger] = logger + config[:AccessLog] = [] + end + config + end + + def self.test_http_client(**opts) + opts = opts.merge(open_timeout: 1, read_timeout: 1) + Net::HTTP.start('127.0.0.1', port, **opts) + end + + # setup / teardown for servers + def setup + Fluent::Test.setup + @posts = [] + @puts = [] + @prohibited = 0 + @requests = 0 + @auth = false + @headers = {} + @dummy_server_thread = Thread.new do + srv = WEBrick::HTTPServer.new(self.class.server_config) + begin + allowed_methods = %w(POST PUT) + srv.mount_proc('/api') { |req,res| + @requests += 1 + unless allowed_methods.include? req.request_method + res.status = 405 + res.body = 'request method mismatch' + next + end + req.each do |key, value| + @headers[key] = value + end + if @auth and req.header['authorization'][0] == 'Basic YWxpY2U6c2VjcmV0IQ==' # pattern of user='alice' passwd='secret!' + # ok, authorized + # pattern of bear #{Base64.encode64('secret token!')} + elsif @auth and req.header['authorization'][0] == 'bearer c2VjcmV0IHRva2VuIQ==' + # pattern of jwt + # header: { + # "alg": "HS256", + # "typ": "JWT" + # } + # payload: { + # "iss": "Hoge Publisher", + # "sub": "Hoge User" + # } + # signature: + # HS256(base64UrlEncode(header) + "." + + # base64UrlEncode(payload) + "." + + # secret) + elsif @auth and req.header['authorization'][0] == 'jwt eyJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJIb2dlIFB1Ymxpc2hlciIsInN1YiI6IkhvZ2UgVXNlciJ9.V2NL7YgCWNt5d3vTXFrcRLpRImO2cU2JZ4mQglqw3rE' + elsif @auth + res.status = 403 + @prohibited += 1 + next + else + # ok, authorization not required + end + + expander = -> (req) { + if req["Content-Encoding"] == "gzip" + StringIO.open(req.body, 'rb'){|sio| + Zlib::GzipReader.wrap(sio).read + } + else + req.body + end + } + + record = {:auth => nil} + if req.content_type == 'application/json' + record[:json] = Yajl.load(expander.call(req)) + elsif req.content_type == 'text/plain' + puts req + record[:data] = expander.call(req) + elsif req.content_type == 'application/octet-stream' + record[:data] = expander.call(req) + elsif req.content_type == 'application/x-ndjson' + data = [] + expander.call(req).each_line { |l| + data << Yajl.load(l) + } + record[:x_ndjson] = data + else + record[:form] = Hash[*(req.body.split('&').map{|kv|kv.split('=')}.flatten)] + end + + instance_variable_get("@#{req.request_method.downcase}s").push(record) + + res.status = 200 + } + srv.mount_proc('/modified-api') { |req,res| + res.status = 303 + res.body = 'See other' + } + srv.mount_proc('/') { |req,res| + res.status = 200 + res.body = 'running' + } + srv.start + ensure + srv.shutdown + end + end + + # to wait completion of dummy server.start() + require 'thread' + cv = ConditionVariable.new + watcher = Thread.new { + connected = false + while not connected + begin + client = self.class.test_http_client + client.request_get('/') + connected = true + rescue Errno::ECONNREFUSED + sleep 0.1 + rescue StandardError => e + p e + sleep 0.1 + end + end + cv.signal + } + mutex = Mutex.new + mutex.synchronize { + cv.wait(mutex) + } + end + + def test_dummy_server + client = self.class.test_http_client + post_header = { 'Content-Type' => 'application/x-www-form-urlencoded' } + + assert_equal '200', client.request_get('/').code + assert_equal '200', client.request_post('/api/service/metrics/hoge', 'number=1&mode=gauge', post_header).code + + assert_equal 1, @posts.size + + assert_equal '1', @posts[0][:form]['number'] + assert_equal 'gauge', @posts[0][:form]['mode'] + assert_nil @posts[0][:auth] + + assert_equal '303', client.request_get('/modified-api').code + + @auth = true + + assert_equal '403', client.request_post('/api/service/metrics/pos', 'number=30&mode=gauge', post_header).code + + req_with_auth = lambda do |number, mode, user, pass| + req = Net::HTTP::Post.new("/api/service/metrics/pos") + req.content_type = 'application/x-www-form-urlencoded' + req.basic_auth user, pass + req.set_form_data({'number'=>number, 'mode'=>mode}) + req + end + + assert_equal '403', client.request(req_with_auth.call(500, 'count', 'alice', 'wrong password!')).code + + assert_equal '403', client.request(req_with_auth.call(500, 'count', 'alice', 'wrong password!')).code + + assert_equal 1, @posts.size + + assert_equal '200', client.request(req_with_auth.call(500, 'count', 'alice', 'secret!')).code + + assert_equal 2, @posts.size + + end + + def teardown + @dummy_server_thread.kill + @dummy_server_thread.join + end + + def create_driver(conf) + Fluent::Test::Driver::Output.new(Fluent::Plugin::HTTPOutput).configure(conf) + end +end + +class HTTPOutputTest < HTTPOutputTestBase + CONFIG = %[ + endpoint_url http://127.0.0.1:#{port}/api/ + ] + + CONFIG_QUERY_PARAM = %[ + endpoint_url http://127.0.0.1:#{port}/api?foo=bar&baz=qux + ] + + CONFIG_JSON = %[ + endpoint_url http://127.0.0.1:#{port}/api/ + serializer json + ] + + CONFIG_TEXT = %[ + endpoint_url http://127.0.0.1:#{port}/api/ + serializer text + ] + + CONFIG_RAW = %[ + endpoint_url http://127.0.0.1:#{port}/api/ + serializer raw + ] + + CONFIG_PUT = %[ + endpoint_url http://127.0.0.1:#{port}/api/ + http_method put + ] + + CONFIG_HTTP_ERROR = %[ + endpoint_url https://127.0.0.1:#{port - 1}/api/ + ] + + CONFIG_HTTP_ERROR_SUPPRESSED = %[ + endpoint_url https://127.0.0.1:#{port - 1}/api/ + raise_on_error false + ] + + RATE_LIMIT_MSEC = 1200 + + CONFIG_RATE_LIMIT = %[ + endpoint_url http://127.0.0.1:#{port}/api/ + rate_limit_msec #{RATE_LIMIT_MSEC} + ] + + def test_configure + d = create_driver CONFIG + assert_equal "http://127.0.0.1:#{self.class.port}/api/", d.instance.endpoint_url + assert_equal :form, d.instance.serializer + assert_equal [503], d.instance.recoverable_status_codes + + d = create_driver CONFIG_JSON + assert_equal "http://127.0.0.1:#{self.class.port}/api/", d.instance.endpoint_url + assert_equal :json, d.instance.serializer + end + + test 'lack of tag in chunk_keys' do + assert_raise_message(/'tag' in chunk_keys is required./) do + create_driver(Fluent::Config::Element.new( + 'ROOT', '', { + '@type' => 'http', + 'endpoint_url' => "http://127.0.0.1:#{self.class.port}/api/", + 'buffered' => true, + }, [ + Fluent::Config::Element.new('buffer', 'mykey', { + 'chunk_keys' => 'mykey' + }, []) + ] + )) + end + end + + def test_emit_form + d = create_driver CONFIG + d.run(default_tag: 'test.metrics') do + d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1, 'binary' => "\xe3\x81\x82".force_encoding("ascii-8bit") }) + end + + assert_equal 1, @posts.size + record = @posts[0] + + assert_equal '50', record[:form]['field1'] + assert_equal '20', record[:form]['field2'] + assert_equal '10', record[:form]['field3'] + assert_equal '1', record[:form]['otherfield'] + assert_equal URI.encode_www_form_component("あ").upcase, record[:form]['binary'].upcase + assert_nil record[:auth] + + d.run(default_tag: 'test.metrics') do + d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1 }) + end + + assert_equal 2, @posts.size + end + + def test_emit_form_with_query_params + d = create_driver CONFIG_QUERY_PARAM + d.run(default_tag: 'test.metrics') do + d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1, 'binary' => "\xe3\x81\x82".force_encoding("ascii-8bit") }) + end + + assert_equal 1, @posts.size + record = @posts[0] + + assert_equal '50', record[:form]['field1'] + assert_equal '20', record[:form]['field2'] + assert_equal '10', record[:form]['field3'] + assert_equal '1', record[:form]['otherfield'] + assert_equal URI.encode_www_form_component("あ").upcase, record[:form]['binary'].upcase + assert_nil record[:auth] + + d.run(default_tag: 'test.metrics') do + d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1 }) + end + + assert_equal 2, @posts.size + end + + def test_emit_form_with_custom_headers + d = create_driver CONFIG + %[custom_headers {"key":"custom","token":"arbitrary"}] + d.run(default_tag: 'test.metrics') do + d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1, 'binary' => "\xe3\x81\x82".force_encoding("ascii-8bit") }) + end + + assert_true @headers.has_key?("key") + assert_equal "custom", @headers["key"] + assert_true @headers.has_key?("token") + assert_equal "arbitrary", @headers["token"] + + assert_equal 1, @posts.size + record = @posts[0] + + assert_equal '50', record[:form]['field1'] + assert_equal '20', record[:form]['field2'] + assert_equal '10', record[:form]['field3'] + assert_equal '1', record[:form]['otherfield'] + assert_equal URI.encode_www_form_component("あ").upcase, record[:form]['binary'].upcase + assert_nil record[:auth] + + d.run(default_tag: 'test.metrics') do + d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1 }) + end + + assert_equal 2, @posts.size + end + + class BufferedEmitTest < self + def test_emit_form + d = create_driver CONFIG + %[buffered true] + d.run(default_tag: 'test.metrics', shutdown: false) do + d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1, 'binary' => "\xe3\x81\x82".force_encoding("ascii-8bit") }) + end + + assert_equal 1, @posts.size + record = @posts[0] + + assert_equal '50', record[:form]['field1'] + assert_equal '20', record[:form]['field2'] + assert_equal '10', record[:form]['field3'] + assert_equal '1', record[:form]['otherfield'] + assert_equal URI.encode_www_form_component("あ").upcase, record[:form]['binary'].upcase + assert_nil record[:auth] + + d.run(default_tag: 'test.metrics', shutdown: false) do + d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1 }) + end + + assert_equal 2, @posts.size + end + + def test_emit_form_with_placeholders + d = create_driver(Fluent::Config::Element.new( + 'ROOT', '' , + {"endpoint_url" => "${endpoint}", + "buffered" => true}, + [Fluent::Config::Element.new('buffer', 'tag, endpoint', {"@type" => "memory"} ,[])])) + + d.run(default_tag: 'test.metrics', shutdown: false) do + d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1, 'binary' => "\xe3\x81\x82".force_encoding("ascii-8bit"), 'endpoint' => "http://127.0.0.1:#{self.class.port}/modified-api/" }) + end + + assert_equal 0, @posts.size # post into other URI + assert_equal "http://127.0.0.1:#{self.class.port}/modified-api/", d.instance.endpoint_url + end + + def test_emit_form_put + d = create_driver CONFIG_PUT + %[buffered true] + d.run(default_tag: 'test.metrics', shutdown: false) do + d.feed({ 'field1' => 50 }) + end + + assert_equal 0, @posts.size + assert_equal 1, @puts.size + record = @puts[0] + + assert_equal '50', record[:form]['field1'] + assert_nil record[:auth] + + d.run(default_tag: 'test.metrics', shutdown: false) do + d.feed({ 'field1' => 50 }) + end + + assert_equal 0, @posts.size + assert_equal 2, @puts.size + end + + def test_emit_json + binary_string = "\xe3\x81\x82" + d = create_driver CONFIG_JSON + %[buffered true] + d.run(default_tag: 'test.metrics') do + d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1, 'binary' => binary_string }) + end + + assert_equal 1, @posts.size + record = @posts[0] + + assert_equal 50, record[:json]['field1'] + assert_equal 20, record[:json]['field2'] + assert_equal 10, record[:json]['field3'] + assert_equal 1, record[:json]['otherfield'] + assert_equal binary_string, record[:json]['binary'] + assert_nil record[:auth] + end + + def test_emit_json_with_compression + binary_string = "\xe3\x81\x82" + d = create_driver CONFIG_JSON + %[buffered true\ncompress_request true] + d.run(default_tag: 'test.metrics') do + d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1, 'binary' => binary_string }) + end + + assert_equal 1, @posts.size + record = @posts[0] + + assert_equal 50, record[:json]['field1'] + assert_equal 20, record[:json]['field2'] + assert_equal 10, record[:json]['field3'] + assert_equal 1, record[:json]['otherfield'] + assert_equal binary_string, record[:json]['binary'] + assert_nil record[:auth] + end + + def test_emit_x_ndjson + binary_string = "\xe3\x81\x82" + d = create_driver CONFIG_JSON + %[buffered true\nbulk_request] + d.run(default_tag: 'test.metrics') do + d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1, 'binary' => binary_string }) + d.feed({ 'field1' => 70, 'field2' => 30, 'field3' => 20, 'otherfield' => 2, 'binary' => binary_string }) + end + + assert_equal 1, @posts.size + record = @posts[0] + + expected =[ + { + "binary" => "\u3042", + "field1" => 50, + "field2" => 20, + "field3" => 10, + "otherfield" => 1 + }, + { + "binary" => "\u3042", + "field1" => 70, + "field2" => 30, + "field3" => 20, + "otherfield" => 2 + } + ] + + assert_equal expected, record[:x_ndjson] + assert_nil record[:auth] + end + + def test_emit_x_ndjson_with_compression + binary_string = "\xe3\x81\x82" + d = create_driver CONFIG_JSON + %[buffered true\nbulk_request true\ncompress_request true] + d.run(default_tag: 'test.metrics') do + d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1, 'binary' => binary_string }) + d.feed({ 'field1' => 70, 'field2' => 30, 'field3' => 20, 'otherfield' => 2, 'binary' => binary_string }) + end + + assert_equal 1, @posts.size + record = @posts[0] + + expected =[ + { + "binary" => "\u3042", + "field1" => 50, + "field2" => 20, + "field3" => 10, + "otherfield" => 1 + }, + { + "binary" => "\u3042", + "field1" => 70, + "field2" => 30, + "field3" => 20, + "otherfield" => 2 + } + ] + + assert_equal expected, record[:x_ndjson] + assert_nil record[:auth] + end + end + + def test_emit_form_put + d = create_driver CONFIG_PUT + d.run(default_tag: 'test.metrics') do + d.feed({ 'field1' => 50 }) + end + + assert_equal 0, @posts.size + assert_equal 1, @puts.size + record = @puts[0] + + assert_equal '50', record[:form]['field1'] + assert_nil record[:auth] + + d.run(default_tag: 'test.metrics') do + d.feed({ 'field1' => 50 }) + end + + assert_equal 0, @posts.size + assert_equal 2, @puts.size + end + + def test_emit_json + binary_string = "\xe3\x81\x82" + d = create_driver CONFIG_JSON + d.run(default_tag: 'test.metrics') do + d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1, 'binary' => binary_string }) + end + + assert_equal 1, @posts.size + record = @posts[0] + + assert_equal 50, record[:json]['field1'] + assert_equal 20, record[:json]['field2'] + assert_equal 10, record[:json]['field3'] + assert_equal 1, record[:json]['otherfield'] + assert_equal binary_string, record[:json]['binary'] + assert_nil record[:auth] + end + + def test_emit_json_with_compression + binary_string = "\xe3\x81\x82" + d = create_driver CONFIG_JSON + %[compress_request true] + d.run(default_tag: 'test.metrics') do + d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1, 'binary' => binary_string }) + end + + assert_equal 1, @posts.size + record = @posts[0] + + assert_equal 50, record[:json]['field1'] + assert_equal 20, record[:json]['field2'] + assert_equal 10, record[:json]['field3'] + assert_equal 1, record[:json]['otherfield'] + assert_equal binary_string, record[:json]['binary'] + assert_nil record[:auth] + end + + def test_emit_text + binary_string = "\xe3\x81\x82" + d = create_driver CONFIG_TEXT + d.run(default_tag: 'test.metrics') do + d.feed({ "message" => "hello" }) + end + assert_equal 1, @posts.size + record = @posts[0] + assert_equal 'hello', record[:data] + assert_nil record[:auth] + end + + def test_emit_text_with_compression + d = create_driver CONFIG_TEXT + %[compress_request true] + d.run(default_tag: 'test.metrics') do + d.feed({ "message" => "hello" }) + end + assert_equal 1, @posts.size + record = @posts[0] + assert_equal 'hello', record[:data] + assert_nil record[:auth] + end + + def test_emit_raw + binary_string = "\xe3\x81\x82" + d = create_driver CONFIG_RAW + %[format msgpack] + d.run(default_tag: 'test.metrics') do + d.feed({ "message" => "hello" }) + end + assert_equal 1, @posts.size + record = @posts[0] + assert_equal ({ "message" => "hello" }).to_msgpack, record[:data] + assert_nil record[:auth] + end + + def test_emit_raw_with_compression + binary_string = "\xe3\x81\x82" + d = create_driver CONFIG_RAW + %[format msgpack\ncompress_request true] + d.run(default_tag: 'test.metrics') do + d.feed({ "message" => "hello" }) + end + assert_equal 1, @posts.size + record = @posts[0] + assert_equal ({ "message" => "hello" }).to_msgpack, record[:data].force_encoding("ascii-8bit") + assert_nil record[:auth] + end + + def test_http_error_is_raised + d = create_driver CONFIG_HTTP_ERROR + assert_raise Errno::ECONNREFUSED do + d.run(default_tag: 'test.metrics') do + d.feed({ 'field1' => 50 }) + end + end + end + + def test_http_error_is_suppressed_with_raise_on_error_false + d = create_driver CONFIG_HTTP_ERROR_SUPPRESSED + d.run(default_tag: 'test.metrics') do + d.feed({ 'field1' => 50 }) + end + # drive asserts the next output chain is called; + # so no exception means our plugin handled the error + + assert_equal 0, @requests + end + + def test_rate_limiting + d = create_driver CONFIG_RATE_LIMIT + record = { :k => 1 } + + last_emit = _current_msec + d.run(default_tag: 'test.metrics') do + d.feed(record) + end + + assert_equal 1, @posts.size + + d.run(default_tag: 'test.metrics') do + d.feed({}) + end + assert last_emit + RATE_LIMIT_MSEC > _current_msec, "Still under rate limiting interval" + assert_equal 1, @posts.size + + wait_msec = 500 + sleep (last_emit + RATE_LIMIT_MSEC - _current_msec + wait_msec) * 0.001 + + assert last_emit + RATE_LIMIT_MSEC < _current_msec, "No longer under rate limiting interval" + d.run(default_tag: 'test.metrics') do + d.feed(record) + end + assert_equal 2, @posts.size + end + + def _current_msec + Time.now.to_f * 1000 + end + + def test_auth + @auth = true # enable authentication of dummy server + + d = create_driver(CONFIG) + d.run(default_tag: 'test.metrics') do + d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1 }) + end # failed in background, and output warn log + + assert_equal 0, @posts.size + assert_equal 1, @prohibited + + d = create_driver(CONFIG + %[ + authentication basic + username alice + password wrong_password + ]) + d.run(default_tag: 'test.metrics') do + d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1 }) + end # failed in background, and output warn log + + assert_equal 0, @posts.size + assert_equal 2, @prohibited + + d = create_driver(CONFIG + %[ + authentication basic + username alice + password secret! + ]) + d.run(default_tag: 'test.metrics') do + d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1 }) + end # failed in background, and output warn log + + assert_equal 1, @posts.size + assert_equal 2, @prohibited + + require 'base64' + d = create_driver(CONFIG + %[ + authentication bearer + token #{Base64.encode64('secret token!')} + ]) + d.run(default_tag: 'test.metrics') do + d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1 }) + end # failed in background, and output warn log + + assert_equal 2, @posts.size + assert_equal 2, @prohibited + + d = create_driver(CONFIG + %[ + authentication jwt + token eyJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJIb2dlIFB1Ymxpc2hlciIsInN1YiI6IkhvZ2UgVXNlciJ9.V2NL7YgCWNt5d3vTXFrcRLpRImO2cU2JZ4mQglqw3rE + ]) + d.run(default_tag: 'test.metrics') do + d.feed({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1 }) + end # failed in background, and output warn log + + assert_equal 3, @posts.size + assert_equal 2, @prohibited + end + + class CustomFormatterTest < self + def test_new_config + config = Fluent::Config::Element.new( + 'ROOT', '', + {"@type" => "http", + "endpoint_url" => "http://127.0.0.1:#{self.class.port}/api/", + "serializer" => "json"}, [ + Fluent::Config::Element.new('format', '', { + "@type" => "test" + }, []) + ]) + d = create_driver config + payload = {"field" => 1} + d.run(default_tag: 'test.metrics') do + d.feed(payload) + end + + record = @posts[0] + expected = {"wrapped" => true, "record" => payload} + assert_equal expected, record[:json] + end + + def test_legacy_config + config = %[ + endpoint_url http://127.0.0.1:#{self.class.port}/api/ + serializer json + format test + ] + + d = create_driver config + payload = {"field" => 1} + d.run(default_tag: 'test.metrics') do + d.feed(payload) + end + + record = @posts[0] + expected = {"wrapped" => true, "record" => payload} + assert_equal expected, record[:json] + end + end +end + +class HTTPSOutputTest < HTTPOutputTestBase + def self.port + 5127 + end + + def self.server_config + config = super + config[:SSLEnable] = true + config[:SSLCertName] = [["CN", WEBrick::Utils::getservername]] + config + end + + def self.test_http_client + super( + use_ssl: true, + verify_mode: OpenSSL::SSL::VERIFY_NONE, + ) + end + + def test_configure + test_uri = URI.parse("https://127.0.0.1/") + + ssl_config = %[ + endpoint_url https://127.0.0.1:#{self.class.port}/api/ + ] + d = create_driver ssl_config + expected_endpoint_url = "https://127.0.0.1:#{self.class.port}/api/" + assert_equal expected_endpoint_url, d.instance.endpoint_url + http_opts = d.instance.http_opts(test_uri) + assert_equal true, http_opts[:use_ssl] + assert_equal OpenSSL::SSL::VERIFY_PEER, http_opts[:verify_mode] + + no_verify_config = %[ + endpoint_url https://127.0.0.1:#{self.class.port}/api/ + ssl_no_verify true + ] + d = create_driver no_verify_config + http_opts = d.instance.http_opts(test_uri) + assert_equal true, http_opts[:use_ssl] + assert_equal OpenSSL::SSL::VERIFY_NONE, http_opts[:verify_mode] + + cacert_file_config = %[ + endpoint_url https://127.0.0.1:#{self.class.port}/api/ + ssl_no_verify true + cacert_file /tmp/ssl.cert + ] + d = create_driver cacert_file_config + FileUtils::touch '/tmp/ssl.cert' + http_opts = d.instance.http_opts(test_uri) + assert_equal true, http_opts[:use_ssl] + assert_equal OpenSSL::SSL::VERIFY_NONE, http_opts[:verify_mode] + assert_equal true, File.file?('/tmp/ssl.cert') + puts http_opts + assert_equal File.join('/tmp/ssl.cert'), http_opts[:ca_file] + end + + def test_emit_form_ssl + config = %[ + endpoint_url https://127.0.0.1:#{self.class.port}/api/ + ssl_no_verify true + ] + d = create_driver config + d.run(default_tag: 'test.metrics') do + d.feed({ 'field1' => 50 }) + end + + assert_equal 1, @posts.size + record = @posts[0] + + assert_equal '50', record[:form]['field1'] + end + + def test_emit_form_ssl_ca + config = %[ + endpoint_url https://127.0.0.1:#{self.class.port}/api/ + ssl_no_verify true + cacert_file /tmp/ssl.cert + ] + d = create_driver config + d.run(default_tag: 'test.metrics') do + d.feed({ 'field1' => 50 }) + end + + assert_equal 1, @posts.size + record = @posts[0] + + assert_equal '50', record[:form]['field1'] + end +end