test/plugin/test_out_http.rb in fluent-plugin-out-http-0.1.4 vs test/plugin/test_out_http.rb in fluent-plugin-out-http-0.2.0

- old
+ new

@@ -1,31 +1,43 @@ # -*- coding: utf-8 -*- +require 'net/http' require 'uri' require 'yajl' require 'fluent/test/http_output_test' require 'fluent/plugin/out_http' -TEST_LISTEN_PORT = 5126 +class HTTPOutputTestBase < Test::Unit::TestCase + def self.port + 5126 + end + def self.server_config + config = {BindAddress: '127.0.0.1', Port: port} + if ENV['VERBOSE'] + logger = WEBrick::Log.new(STDOUT, WEBrick::BasicLog::DEBUG) + config[:Logger] = logger + config[:AccessLog] = [] + end + config + end -class HTTPOutputTestBase < Test::Unit::TestCase + def self.test_http_client(**opts) + opts = opts.merge(open_timeout: 1, read_timeout: 1) + Net::HTTP.start('127.0.0.1', port, **opts) + end + # setup / teardown for servers def setup Fluent::Test.setup @posts = [] @puts = [] @prohibited = 0 @requests = 0 @auth = false @dummy_server_thread = Thread.new do - srv = if ENV['VERBOSE'] - WEBrick::HTTPServer.new({:BindAddress => '127.0.0.1', :Port => TEST_LISTEN_PORT}) - else - logger = WEBrick::Log.new('/dev/null', WEBrick::BasicLog::DEBUG) - WEBrick::HTTPServer.new({:BindAddress => '127.0.0.1', :Port => TEST_LISTEN_PORT, :Logger => logger, :AccessLog => []}) - end + srv = WEBrick::HTTPServer.new(self.class.server_config) begin allowed_methods = %w(POST PUT) srv.mount_proc('/api/') { |req,res| @requests += 1 unless allowed_methods.include? req.request_method @@ -69,48 +81,48 @@ cv = ConditionVariable.new watcher = Thread.new { connected = false while not connected begin - get_content('localhost', TEST_LISTEN_PORT, '/') + client = self.class.test_http_client + client.request_get('/') connected = true rescue Errno::ECONNREFUSED sleep 0.1 rescue StandardError => e p e sleep 0.1 end end cv.signal - } + } mutex = Mutex.new mutex.synchronize { cv.wait(mutex) - } + } end def test_dummy_server - host = '127.0.0.1' - port = TEST_LISTEN_PORT - client = Net::HTTP.start(host, port) + client = self.class.test_http_client + post_header = { 'Content-Type' => 'application/x-www-form-urlencoded' } assert_equal '200', client.request_get('/').code - assert_equal '200', client.request_post('/api/service/metrics/hoge', 'number=1&mode=gauge').code + assert_equal '200', client.request_post('/api/service/metrics/hoge', 'number=1&mode=gauge', post_header).code assert_equal 1, @posts.size assert_equal '1', @posts[0][:form]['number'] assert_equal 'gauge', @posts[0][:form]['mode'] assert_nil @posts[0][:auth] @auth = true - assert_equal '403', client.request_post('/api/service/metrics/pos', 'number=30&mode=gauge').code + assert_equal '403', client.request_post('/api/service/metrics/pos', 'number=30&mode=gauge', post_header).code req_with_auth = lambda do |number, mode, user, pass| - url = URI.parse("http://#{host}:#{port}/api/service/metrics/pos") - req = Net::HTTP::Post.new(url.path) + req = Net::HTTP::Post.new("/api/service/metrics/pos") + req.content_type = 'application/x-www-form-urlencoded' req.basic_auth user, pass req.set_form_data({'number'=>number, 'mode'=>mode}) req end @@ -128,59 +140,59 @@ def teardown @dummy_server_thread.kill @dummy_server_thread.join end + + def create_driver(conf, tag='test.metrics') + Fluent::Test::OutputTestDriver.new(Fluent::HTTPOutput, tag).configure(conf) + end end class HTTPOutputTest < HTTPOutputTestBase CONFIG = %[ - endpoint_url http://127.0.0.1:#{TEST_LISTEN_PORT}/api/ + endpoint_url http://127.0.0.1:#{port}/api/ ] CONFIG_JSON = %[ - endpoint_url http://127.0.0.1:#{TEST_LISTEN_PORT}/api/ + endpoint_url http://127.0.0.1:#{port}/api/ serializer json ] CONFIG_PUT = %[ - endpoint_url http://127.0.0.1:#{TEST_LISTEN_PORT}/api/ + endpoint_url http://127.0.0.1:#{port}/api/ http_method put ] CONFIG_HTTP_ERROR = %[ - endpoint_url https://127.0.0.1:#{TEST_LISTEN_PORT + 1}/api/ + endpoint_url https://127.0.0.1:#{port - 1}/api/ ] CONFIG_HTTP_ERROR_SUPPRESSED = %[ - endpoint_url https://127.0.0.1:#{TEST_LISTEN_PORT + 1}/api/ + endpoint_url https://127.0.0.1:#{port - 1}/api/ raise_on_error false ] RATE_LIMIT_MSEC = 1200 CONFIG_RATE_LIMIT = %[ - endpoint_url http://127.0.0.1:#{TEST_LISTEN_PORT}/api/ + endpoint_url http://127.0.0.1:#{port}/api/ rate_limit_msec #{RATE_LIMIT_MSEC} ] - def create_driver(conf=CONFIG, tag='test.metrics') - Fluent::Test::OutputTestDriver.new(Fluent::HTTPOutput, tag).configure(conf) - end - def test_configure - d = create_driver - assert_equal "http://127.0.0.1:#{TEST_LISTEN_PORT}/api/", d.instance.endpoint_url + d = create_driver CONFIG + assert_equal "http://127.0.0.1:#{self.class.port}/api/", d.instance.endpoint_url assert_equal :form, d.instance.serializer d = create_driver CONFIG_JSON - assert_equal "http://127.0.0.1:#{TEST_LISTEN_PORT}/api/", d.instance.endpoint_url + assert_equal "http://127.0.0.1:#{self.class.port}/api/", d.instance.endpoint_url assert_equal :json, d.instance.serializer end def test_emit_form - d = create_driver + d = create_driver CONFIG d.emit({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1, 'binary' => "\xe3\x81\x82".force_encoding("ascii-8bit") }) d.run assert_equal 1, @posts.size record = @posts[0] @@ -216,11 +228,11 @@ assert_equal 0, @posts.size assert_equal 2, @puts.size end def test_emit_json - binary_string = "\xe3\x81\x82".force_encoding("ascii-8bit") + binary_string = "\xe3\x81\x82" d = create_driver CONFIG_JSON d.emit({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1, 'binary' => binary_string }) d.run assert_equal 1, @posts.size @@ -266,21 +278,21 @@ assert last_emit + RATE_LIMIT_MSEC > _current_msec, "Still under rate limiting interval" assert_equal 1, @posts.size wait_msec = 500 sleep (last_emit + RATE_LIMIT_MSEC - _current_msec + wait_msec) * 0.001 - + assert last_emit + RATE_LIMIT_MSEC < _current_msec, "No longer under rate limiting interval" d.emit(record) d.run assert_equal 2, @posts.size end def _current_msec Time.now.to_f * 1000 end - + def test_auth @auth = true # enable authentication of dummy server d = create_driver(CONFIG, 'test.metrics') d.emit({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1 }) @@ -310,6 +322,64 @@ assert_equal 1, @posts.size assert_equal 2, @prohibited end +end + +class HTTPSOutputTest < HTTPOutputTestBase + def self.port + 5127 + end + + def self.server_config + config = super + config[:SSLEnable] = true + config[:SSLCertName] = [["CN", WEBrick::Utils::getservername]] + config + end + + def self.test_http_client + super( + use_ssl: true, + verify_mode: OpenSSL::SSL::VERIFY_NONE, + ) + end + + def test_configure + test_uri = URI.parse("https://127.0.0.1/") + + ssl_config = %[ + endpoint_url https://127.0.0.1:#{self.class.port}/api/ + ] + d = create_driver ssl_config + expected_endpoint_url = "https://127.0.0.1:#{self.class.port}/api/" + assert_equal expected_endpoint_url, d.instance.endpoint_url + http_opts = d.instance.http_opts(test_uri) + assert_equal true, http_opts[:use_ssl] + assert_equal OpenSSL::SSL::VERIFY_PEER, http_opts[:verify_mode] + + no_verify_config = %[ + endpoint_url https://127.0.0.1:#{self.class.port}/api/ + ssl_no_verify true + ] + d = create_driver no_verify_config + http_opts = d.instance.http_opts(test_uri) + assert_equal true, http_opts[:use_ssl] + assert_equal OpenSSL::SSL::VERIFY_NONE, http_opts[:verify_mode] + end + + def test_emit_form_ssl + config = %[ + endpoint_url https://127.0.0.1:#{self.class.port}/api/ + ssl_no_verify true + ] + d = create_driver config + d.emit({ 'field1' => 50 }) + d.run + + assert_equal 1, @posts.size + record = @posts[0] + + assert_equal '50', record[:form]['field1'] + end end