test/plugin/test_out_http.rb in fluent-plugin-out-http-0.1.4 vs test/plugin/test_out_http.rb in fluent-plugin-out-http-0.2.0
- old
+ new
@@ -1,31 +1,43 @@
# -*- coding: utf-8 -*-
+require 'net/http'
require 'uri'
require 'yajl'
require 'fluent/test/http_output_test'
require 'fluent/plugin/out_http'
-TEST_LISTEN_PORT = 5126
+class HTTPOutputTestBase < Test::Unit::TestCase
+ def self.port
+ 5126
+ end
+ def self.server_config
+ config = {BindAddress: '127.0.0.1', Port: port}
+ if ENV['VERBOSE']
+ logger = WEBrick::Log.new(STDOUT, WEBrick::BasicLog::DEBUG)
+ config[:Logger] = logger
+ config[:AccessLog] = []
+ end
+ config
+ end
-class HTTPOutputTestBase < Test::Unit::TestCase
+ def self.test_http_client(**opts)
+ opts = opts.merge(open_timeout: 1, read_timeout: 1)
+ Net::HTTP.start('127.0.0.1', port, **opts)
+ end
+
# setup / teardown for servers
def setup
Fluent::Test.setup
@posts = []
@puts = []
@prohibited = 0
@requests = 0
@auth = false
@dummy_server_thread = Thread.new do
- srv = if ENV['VERBOSE']
- WEBrick::HTTPServer.new({:BindAddress => '127.0.0.1', :Port => TEST_LISTEN_PORT})
- else
- logger = WEBrick::Log.new('/dev/null', WEBrick::BasicLog::DEBUG)
- WEBrick::HTTPServer.new({:BindAddress => '127.0.0.1', :Port => TEST_LISTEN_PORT, :Logger => logger, :AccessLog => []})
- end
+ srv = WEBrick::HTTPServer.new(self.class.server_config)
begin
allowed_methods = %w(POST PUT)
srv.mount_proc('/api/') { |req,res|
@requests += 1
unless allowed_methods.include? req.request_method
@@ -69,48 +81,48 @@
cv = ConditionVariable.new
watcher = Thread.new {
connected = false
while not connected
begin
- get_content('localhost', TEST_LISTEN_PORT, '/')
+ client = self.class.test_http_client
+ client.request_get('/')
connected = true
rescue Errno::ECONNREFUSED
sleep 0.1
rescue StandardError => e
p e
sleep 0.1
end
end
cv.signal
- }
+ }
mutex = Mutex.new
mutex.synchronize {
cv.wait(mutex)
- }
+ }
end
def test_dummy_server
- host = '127.0.0.1'
- port = TEST_LISTEN_PORT
- client = Net::HTTP.start(host, port)
+ client = self.class.test_http_client
+ post_header = { 'Content-Type' => 'application/x-www-form-urlencoded' }
assert_equal '200', client.request_get('/').code
- assert_equal '200', client.request_post('/api/service/metrics/hoge', 'number=1&mode=gauge').code
+ assert_equal '200', client.request_post('/api/service/metrics/hoge', 'number=1&mode=gauge', post_header).code
assert_equal 1, @posts.size
assert_equal '1', @posts[0][:form]['number']
assert_equal 'gauge', @posts[0][:form]['mode']
assert_nil @posts[0][:auth]
@auth = true
- assert_equal '403', client.request_post('/api/service/metrics/pos', 'number=30&mode=gauge').code
+ assert_equal '403', client.request_post('/api/service/metrics/pos', 'number=30&mode=gauge', post_header).code
req_with_auth = lambda do |number, mode, user, pass|
- url = URI.parse("http://#{host}:#{port}/api/service/metrics/pos")
- req = Net::HTTP::Post.new(url.path)
+ req = Net::HTTP::Post.new("/api/service/metrics/pos")
+ req.content_type = 'application/x-www-form-urlencoded'
req.basic_auth user, pass
req.set_form_data({'number'=>number, 'mode'=>mode})
req
end
@@ -128,59 +140,59 @@
def teardown
@dummy_server_thread.kill
@dummy_server_thread.join
end
+
+ def create_driver(conf, tag='test.metrics')
+ Fluent::Test::OutputTestDriver.new(Fluent::HTTPOutput, tag).configure(conf)
+ end
end
class HTTPOutputTest < HTTPOutputTestBase
CONFIG = %[
- endpoint_url http://127.0.0.1:#{TEST_LISTEN_PORT}/api/
+ endpoint_url http://127.0.0.1:#{port}/api/
]
CONFIG_JSON = %[
- endpoint_url http://127.0.0.1:#{TEST_LISTEN_PORT}/api/
+ endpoint_url http://127.0.0.1:#{port}/api/
serializer json
]
CONFIG_PUT = %[
- endpoint_url http://127.0.0.1:#{TEST_LISTEN_PORT}/api/
+ endpoint_url http://127.0.0.1:#{port}/api/
http_method put
]
CONFIG_HTTP_ERROR = %[
- endpoint_url https://127.0.0.1:#{TEST_LISTEN_PORT + 1}/api/
+ endpoint_url https://127.0.0.1:#{port - 1}/api/
]
CONFIG_HTTP_ERROR_SUPPRESSED = %[
- endpoint_url https://127.0.0.1:#{TEST_LISTEN_PORT + 1}/api/
+ endpoint_url https://127.0.0.1:#{port - 1}/api/
raise_on_error false
]
RATE_LIMIT_MSEC = 1200
CONFIG_RATE_LIMIT = %[
- endpoint_url http://127.0.0.1:#{TEST_LISTEN_PORT}/api/
+ endpoint_url http://127.0.0.1:#{port}/api/
rate_limit_msec #{RATE_LIMIT_MSEC}
]
- def create_driver(conf=CONFIG, tag='test.metrics')
- Fluent::Test::OutputTestDriver.new(Fluent::HTTPOutput, tag).configure(conf)
- end
-
def test_configure
- d = create_driver
- assert_equal "http://127.0.0.1:#{TEST_LISTEN_PORT}/api/", d.instance.endpoint_url
+ d = create_driver CONFIG
+ assert_equal "http://127.0.0.1:#{self.class.port}/api/", d.instance.endpoint_url
assert_equal :form, d.instance.serializer
d = create_driver CONFIG_JSON
- assert_equal "http://127.0.0.1:#{TEST_LISTEN_PORT}/api/", d.instance.endpoint_url
+ assert_equal "http://127.0.0.1:#{self.class.port}/api/", d.instance.endpoint_url
assert_equal :json, d.instance.serializer
end
def test_emit_form
- d = create_driver
+ d = create_driver CONFIG
d.emit({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1, 'binary' => "\xe3\x81\x82".force_encoding("ascii-8bit") })
d.run
assert_equal 1, @posts.size
record = @posts[0]
@@ -216,11 +228,11 @@
assert_equal 0, @posts.size
assert_equal 2, @puts.size
end
def test_emit_json
- binary_string = "\xe3\x81\x82".force_encoding("ascii-8bit")
+ binary_string = "\xe3\x81\x82"
d = create_driver CONFIG_JSON
d.emit({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1, 'binary' => binary_string })
d.run
assert_equal 1, @posts.size
@@ -266,21 +278,21 @@
assert last_emit + RATE_LIMIT_MSEC > _current_msec, "Still under rate limiting interval"
assert_equal 1, @posts.size
wait_msec = 500
sleep (last_emit + RATE_LIMIT_MSEC - _current_msec + wait_msec) * 0.001
-
+
assert last_emit + RATE_LIMIT_MSEC < _current_msec, "No longer under rate limiting interval"
d.emit(record)
d.run
assert_equal 2, @posts.size
end
def _current_msec
Time.now.to_f * 1000
end
-
+
def test_auth
@auth = true # enable authentication of dummy server
d = create_driver(CONFIG, 'test.metrics')
d.emit({ 'field1' => 50, 'field2' => 20, 'field3' => 10, 'otherfield' => 1 })
@@ -310,6 +322,64 @@
assert_equal 1, @posts.size
assert_equal 2, @prohibited
end
+end
+
+class HTTPSOutputTest < HTTPOutputTestBase
+ def self.port
+ 5127
+ end
+
+ def self.server_config
+ config = super
+ config[:SSLEnable] = true
+ config[:SSLCertName] = [["CN", WEBrick::Utils::getservername]]
+ config
+ end
+
+ def self.test_http_client
+ super(
+ use_ssl: true,
+ verify_mode: OpenSSL::SSL::VERIFY_NONE,
+ )
+ end
+
+ def test_configure
+ test_uri = URI.parse("https://127.0.0.1/")
+
+ ssl_config = %[
+ endpoint_url https://127.0.0.1:#{self.class.port}/api/
+ ]
+ d = create_driver ssl_config
+ expected_endpoint_url = "https://127.0.0.1:#{self.class.port}/api/"
+ assert_equal expected_endpoint_url, d.instance.endpoint_url
+ http_opts = d.instance.http_opts(test_uri)
+ assert_equal true, http_opts[:use_ssl]
+ assert_equal OpenSSL::SSL::VERIFY_PEER, http_opts[:verify_mode]
+
+ no_verify_config = %[
+ endpoint_url https://127.0.0.1:#{self.class.port}/api/
+ ssl_no_verify true
+ ]
+ d = create_driver no_verify_config
+ http_opts = d.instance.http_opts(test_uri)
+ assert_equal true, http_opts[:use_ssl]
+ assert_equal OpenSSL::SSL::VERIFY_NONE, http_opts[:verify_mode]
+ end
+
+ def test_emit_form_ssl
+ config = %[
+ endpoint_url https://127.0.0.1:#{self.class.port}/api/
+ ssl_no_verify true
+ ]
+ d = create_driver config
+ d.emit({ 'field1' => 50 })
+ d.run
+
+ assert_equal 1, @posts.size
+ record = @posts[0]
+
+ assert_equal '50', record[:form]['field1']
+ end
end