require_relative '../helper' require 'fluent/test/driver/input' require 'fluent/plugin/in_http' require 'net/http' require 'timecop' class HttpInputTest < Test::Unit::TestCase class << self def startup socket_manager_path = ServerEngine::SocketManager::Server.generate_path @server = ServerEngine::SocketManager::Server.open(socket_manager_path) ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s end def shutdown @server.close end end def setup Fluent::Test.setup @port = unused_port end def teardown Timecop.return @port = nil end def config %[ port #{@port} bind "127.0.0.1" body_size_limit 10m keepalive_timeout 5 respond_with_empty_img true use_204_response false ] end def create_driver(conf=config) Fluent::Test::Driver::Input.new(Fluent::Plugin::HttpInput).configure(conf) end def test_configure d = create_driver assert_equal @port, d.instance.port assert_equal '127.0.0.1', d.instance.bind assert_equal 10*1024*1024, d.instance.body_size_limit assert_equal 5, d.instance.keepalive_timeout assert_equal false, d.instance.add_http_headers assert_equal false, d.instance.add_query_params end def test_time d = create_driver time = event_time("2011-01-02 13:14:15.123 UTC") Timecop.freeze(Time.at(time)) events = [ ["tag1", time, {"a" => 1}], ["tag2", time, {"a" => 2}], ] res_codes = [] d.run(expect_records: 2) do events.each do |tag, _time, record| res = post("/#{tag}", {"json"=>record.to_json}) res_codes << res.code end end assert_equal ["200", "200"], res_codes assert_equal events, d.events assert_equal_event_time time, d.events[0][1] assert_equal_event_time time, d.events[1][1] end def test_time_as_float d = create_driver time = event_time("2011-01-02 13:14:15.123 UTC") float_time = time.to_f events = [ ["tag1", time, {"a"=>1}], ] res_codes = [] d.run(expect_records: 1) do events.each do |tag, t, record| res = post("/#{tag}", {"json"=>record.to_json, "time"=>float_time.to_s}) res_codes << res.code end end assert_equal ["200"], res_codes assert_equal events, d.events assert_equal_event_time time, d.events[0][1] end def test_json d = create_driver time = event_time("2011-01-02 13:14:15 UTC") time_i = time.to_i events = [ ["tag1", time_i, {"a"=>1}], ["tag2", time_i, {"a"=>2}], ] res_codes = [] d.run(expect_records: 2) do events.each do |tag, t, record| res = post("/#{tag}", {"json"=>record.to_json, "time"=>t.to_s}) res_codes << res.code end end assert_equal ["200", "200"], res_codes assert_equal events, d.events assert_equal_event_time time, d.events[0][1] assert_equal_event_time time, d.events[1][1] end data('json' => ['json', :to_json], 'msgpack' => ['msgpack', :to_msgpack]) def test_default_with_time_format(data) param, method_name = data d = create_driver(config + %[ keep_time_key time_format %iso8601 ]) time = event_time("2020-06-10T01:14:27+00:00") events = [ ["tag1", time, {"a" => 1, "time" => '2020-06-10T01:14:27+00:00'}], ["tag2", time, {"a" => 2, "time" => '2020-06-10T01:14:27+00:00'}], ] res_codes = [] d.run(expect_records: 2) do events.each do |tag, t, record| res = post("/#{tag}", {param => record.__send__(method_name)}) res_codes << res.code end end assert_equal ["200", "200"], res_codes assert_equal events, d.events assert_equal_event_time time, d.events[0][1] assert_equal_event_time time, d.events[1][1] end def test_multi_json d = create_driver time = event_time("2011-01-02 13:14:15 UTC") time_i = time.to_i records = [{"a"=>1},{"a"=>2}] events = [ ["tag1", time_i, records[0]], ["tag1", time_i, records[1]], ] tag = "tag1" res_codes = [] d.run(expect_records: 2, timeout: 5) do res = post("/#{tag}", {"json"=>records.to_json, "time"=>time_i.to_s}) res_codes << res.code end assert_equal ["200"], res_codes assert_equal events, d.events assert_equal_event_time time, d.events[0][1] assert_equal_event_time time, d.events[1][1] end def test_multi_json_with_time_field d = create_driver time = event_time("2011-01-02 13:14:15 UTC") time_i = time.to_i time_f = time.to_f records = [{"a" => 1, 'time' => time_i},{"a" => 2, 'time' => time_f}] events = [ ["tag1", time, {'a' => 1}], ["tag1", time, {'a' => 2}], ] tag = "tag1" res_codes = [] d.run(expect_records: 2, timeout: 5) do res = post("/#{tag}", {"json" => records.to_json}) res_codes << res.code end assert_equal ["200"], res_codes assert_equal events, d.events assert_instance_of Fluent::EventTime, d.events[0][1] assert_instance_of Fluent::EventTime, d.events[1][1] assert_equal_event_time time, d.events[0][1] assert_equal_event_time time, d.events[1][1] end data('json' => ['json', :to_json], 'msgpack' => ['msgpack', :to_msgpack]) def test_default_multi_with_time_format(data) param, method_name = data d = create_driver(config + %[ keep_time_key time_format %iso8601 ]) time = event_time("2020-06-10T01:14:27+00:00") events = [ ["tag1", time, {'a' => 1, 'time' => "2020-06-10T01:14:27+00:00"}], ["tag1", time, {'a' => 2, 'time' => "2020-06-10T01:14:27+00:00"}], ] tag = "tag1" res_codes = [] d.run(expect_records: 2, timeout: 5) do res = post("/#{tag}", {param => events.map { |e| e[2] }.__send__(method_name)}) res_codes << res.code end assert_equal ["200"], res_codes assert_equal events, d.events assert_equal_event_time time, d.events[0][1] assert_equal_event_time time, d.events[1][1] end def test_multi_json_with_nonexistent_time_key d = create_driver(config + %[ time_key missing ]) time = event_time("2011-01-02 13:14:15 UTC") time_i = time.to_i time_f = time.to_f records = [{"a" => 1, 'time' => time_i},{"a" => 2, 'time' => time_f}] tag = "tag1" res_codes = [] d.run(expect_records: 2, timeout: 5) do res = post("/#{tag}", {"json" => records.to_json}) res_codes << res.code end assert_equal ["200"], res_codes assert_equal 2, d.events.size assert_not_equal time_i, d.events[0][1].sec # current time is used because "missing" field doesn't exist assert_not_equal time_i, d.events[1][1].sec end def test_json_with_add_remote_addr d = create_driver(config + "add_remote_addr true") time = event_time("2011-01-02 13:14:15 UTC") time_i = time.to_i events = [ ["tag1", time, {"REMOTE_ADDR"=>"127.0.0.1", "a"=>1}], ["tag2", time, {"REMOTE_ADDR"=>"127.0.0.1", "a"=>2}], ] res_codes = [] d.run(expect_records: 2) do events.each do |tag, _t, record| res = post("/#{tag}", {"json"=>record.to_json, "time"=>time_i.to_s}) res_codes << res.code end end assert_equal ["200", "200"], res_codes assert_equal events, d.events assert_equal_event_time time, d.events[0][1] assert_equal_event_time time, d.events[1][1] end def test_exact_match_for_expect d = create_driver(config) records = [{ "a" => 1}, { "a" => 2 }] tag = "tag1" res_codes = [] d.run(expect_records: 0, timeout: 5) do res = post("/#{tag}", { "json" => records.to_json }, { 'Expect' => 'something' }) res_codes << res.code end assert_equal ["417"], res_codes end def test_exact_match_for_expect_with_other_header d = create_driver(config) records = [{ "a" => 1}, { "a" => 2 }] tag = "tag1" res_codes = [] d.run(expect_records: 2, timeout: 5) do res = post("/#{tag}", { "json" => records.to_json, 'x-envoy-expected-rq-timeout-ms' => 4 }) res_codes << res.code end assert_equal ["200"], res_codes assert_equal "tag1", d.events[0][0] assert_equal 1, d.events[0][2]["a"] assert_equal "tag1", d.events[1][0] assert_equal 2, d.events[1][2]["a"] end def test_multi_json_with_add_remote_addr d = create_driver(config + "add_remote_addr true") time = event_time("2011-01-02 13:14:15 UTC") time_i = time.to_i records = [{"a"=>1},{"a"=>2}] tag = "tag1" res_codes = [] d.run(expect_records: 2, timeout: 5) do res = post("/#{tag}", {"json"=>records.to_json, "time"=>time_i.to_s}) res_codes << res.code end assert_equal ["200"], res_codes assert_equal "tag1", d.events[0][0] assert_equal_event_time time, d.events[0][1] assert_equal 1, d.events[0][2]["a"] assert{ d.events[0][2].has_key?("REMOTE_ADDR") && d.events[0][2]["REMOTE_ADDR"] =~ /^\d{1,4}(\.\d{1,4}){3}$/ } assert_equal "tag1", d.events[1][0] assert_equal_event_time time, d.events[1][1] assert_equal 2, d.events[1][2]["a"] end def test_json_with_add_remote_addr_given_x_forwarded_for d = create_driver(config + "add_remote_addr true") time = event_time("2011-01-02 13:14:15 UTC") time_i = time.to_i events = [ ["tag1", time, {"a"=>1}], ["tag2", time, {"a"=>2}], ] res_codes = [] d.run(expect_records: 2) do events.each do |tag, _t, record| res = post("/#{tag}", {"json"=>record.to_json, "time"=>time_i.to_s}, {"X-Forwarded-For"=>"129.78.138.66, 127.0.0.1"}) res_codes << res.code end end assert_equal ["200", "200"], res_codes assert_equal "tag1", d.events[0][0] assert_equal_event_time time, d.events[0][1] assert_equal({"REMOTE_ADDR"=>"129.78.138.66", "a"=>1}, d.events[0][2]) assert_equal "tag2", d.events[1][0] assert_equal_event_time time, d.events[1][1] assert_equal({"REMOTE_ADDR"=>"129.78.138.66", "a"=>2}, d.events[1][2]) end def test_multi_json_with_add_remote_addr_given_x_forwarded_for d = create_driver(config + "add_remote_addr true") tag = "tag1" time = event_time("2011-01-02 13:14:15 UTC") time_i = time.to_i records = [{"a"=>1},{"a"=>2}] events = [ [tag, time, {"REMOTE_ADDR"=>"129.78.138.66", "a"=>1}], [tag, time, {"REMOTE_ADDR"=>"129.78.138.66", "a"=>2}], ] res_codes = [] d.run(expect_records: 2, timeout: 5) do res = post("/#{tag}", {"json"=>records.to_json, "time"=>time_i.to_s}, {"X-Forwarded-For"=>"129.78.138.66, 127.0.0.1"}) res_codes << res.code end assert_equal ["200"], res_codes assert_equal events, d.events assert_equal_event_time time, d.events[0][1] assert_equal_event_time time, d.events[1][1] end def test_add_remote_addr_given_multi_x_forwarded_for d = create_driver(config + "add_remote_addr true") tag = "tag1" time = event_time("2011-01-02 13:14:15 UTC") time_i = time.to_i record = {"a" => 1} event = ["tag1", time, {"REMOTE_ADDR" => "129.78.138.66", "a" => 1}] res_code = nil d.run(expect_records: 1, timeout: 5) do res = post("/#{tag}", {"json" => record.to_json, "time" => time_i.to_s}) { |http, req| # net/http can't send multiple headers so overwrite it. def req.each_capitalized block_given? or return enum_for(__method__) { @header.size } @header.each do |k, vs| vs.each { |v| yield capitalize(k), v } end end req.add_field("X-Forwarded-For", "129.78.138.66, 127.0.0.1") req.add_field("X-Forwarded-For", "8.8.8.8") } res_code = res.code end assert_equal "200", res_code assert_equal event, d.events.first assert_equal_event_time time, d.events.first[1] end def test_multi_json_with_add_http_headers d = create_driver(config + "add_http_headers true") time = event_time("2011-01-02 13:14:15 UTC") time_i = time.to_i records = [{"a"=>1},{"a"=>2}] tag = "tag1" res_codes = [] d.run(expect_records: 2, timeout: 5) do res = post("/#{tag}", {"json"=>records.to_json, "time"=>time_i.to_s}) res_codes << res.code end assert_equal ["200"], res_codes assert_equal "tag1", d.events[0][0] assert_equal_event_time time, d.events[0][1] assert_equal 1, d.events[0][2]["a"] assert_equal "tag1", d.events[1][0] assert_equal_event_time time, d.events[1][1] assert_equal 2, d.events[1][2]["a"] assert include_http_header?(d.events[0][2]) assert include_http_header?(d.events[1][2]) end def test_json_with_add_http_headers d = create_driver(config + "add_http_headers true") time = event_time("2011-01-02 13:14:15 UTC") time_i = time.to_i events = [ ["tag1", time, {"a"=>1}], ["tag2", time, {"a"=>2}], ] res_codes = [] d.run(expect_records: 2) do events.each do |tag, t, record| res = post("/#{tag}", {"json"=>record.to_json, "time"=>time_i.to_s}) res_codes << res.code end end assert_equal ["200", "200"], res_codes assert_equal "tag1", d.events[0][0] assert_equal_event_time time, d.events[0][1] assert_equal 1, d.events[0][2]["a"] assert_equal "tag2", d.events[1][0] assert_equal_event_time time, d.events[1][1] assert_equal 2, d.events[1][2]["a"] assert include_http_header?(d.events[0][2]) assert include_http_header?(d.events[1][2]) end def test_multi_json_with_custom_parser d = create_driver(config + %[ @type json keep_time_key true time_key foo time_format %iso8601 ]) time = event_time("2011-01-02 13:14:15 UTC") time_s = Time.at(time).iso8601 records = [{"foo"=>time_s,"bar"=>"test1"},{"foo"=>time_s,"bar"=>"test2"}] tag = "tag1" res_codes = [] d.run(expect_records: 2, timeout: 5) do res = post("/#{tag}", records.to_json, {"Content-Type"=>"application/octet-stream"}) res_codes << res.code end assert_equal ["200"], res_codes assert_equal "tag1", d.events[0][0] assert_equal_event_time time, d.events[0][1] assert_equal d.events[0][2], records[0] assert_equal "tag1", d.events[1][0] assert_equal_event_time time, d.events[1][1] assert_equal d.events[1][2], records[1] end def test_application_json d = create_driver time = event_time("2011-01-02 13:14:15 UTC") time_i = time.to_i events = [ ["tag1", time, {"a"=>1}], ["tag2", time, {"a"=>2}], ] res_codes = [] d.run(expect_records: 2) do events.each do |tag, t, record| res = post("/#{tag}?time=#{time_i.to_s}", record.to_json, {"Content-Type"=>"application/json; charset=utf-8"}) res_codes << res.code end end assert_equal ["200", "200"], res_codes assert_equal events, d.events assert_equal_event_time time, d.events[0][1] assert_equal_event_time time, d.events[1][1] end def test_application_msgpack d = create_driver time = event_time("2011-01-02 13:14:15 UTC") time_i = time.to_i events = [ ["tag1", time, {"a"=>1}], ["tag2", time, {"a"=>2}], ] res_codes = [] d.run(expect_records: 2) do events.each do |tag, t, record| res = post("/#{tag}?time=#{time_i.to_s}", record.to_msgpack, {"Content-Type"=>"application/msgpack"}) res_codes << res.code end end assert_equal ["200", "200"], res_codes assert_equal events, d.events assert_equal_event_time time, d.events[0][1] assert_equal_event_time time, d.events[1][1] end def test_msgpack d = create_driver time = event_time("2011-01-02 13:14:15 UTC") time_i = time.to_i events = [ ["tag1", time, {"a"=>1}], ["tag2", time, {"a"=>2}], ] res_codes = [] d.run(expect_records: 2) do events.each do |tag, t, record| res = post("/#{tag}", {"msgpack"=>record.to_msgpack, "time"=>time_i.to_s}) res_codes << res.code end end assert_equal ["200", "200"], res_codes assert_equal events, d.events assert_equal_event_time time, d.events[0][1] assert_equal_event_time time, d.events[1][1] end def test_multi_msgpack d = create_driver time = event_time("2011-01-02 13:14:15 UTC") time_i = time.to_i records = [{"a"=>1},{"a"=>2}] events = [ ["tag1", time, records[0]], ["tag1", time, records[1]], ] tag = "tag1" res_codes = [] d.run(expect_records: 2) do res = post("/#{tag}", {"msgpack"=>records.to_msgpack, "time"=>time_i.to_s}) res_codes << res.code end assert_equal ["200"], res_codes assert_equal events, d.events assert_equal_event_time time, d.events[0][1] assert_equal_event_time time, d.events[1][1] end def test_with_regexp d = create_driver(config + %[ format /^(?\\d+):(?\\w+)$/ types field_1:integer ]) time = event_time("2011-01-02 13:14:15 UTC") time_i = time.to_i events = [ ["tag1", time, {"field_1" => 1, "field_2" => 'str'}], ["tag2", time, {"field_1" => 2, "field_2" => 'str'}], ] res_codes = [] d.run(expect_records: 2) do events.each do |tag, t, record| body = record.map { |k, v| v.to_s }.join(':') res = post("/#{tag}?time=#{time_i.to_s}", body, {'Content-Type' => 'application/octet-stream'}) res_codes << res.code end end assert_equal ["200", "200"], res_codes assert_equal events, d.events assert_equal_event_time time, d.events[0][1] assert_equal_event_time time, d.events[1][1] end def test_with_csv require 'csv' d = create_driver(config + %[ format csv keys foo,bar ]) time = event_time("2011-01-02 13:14:15 UTC") time_i = time.to_i events = [ ["tag1", time, {"foo" => "1", "bar" => 'st"r'}], ["tag2", time, {"foo" => "2", "bar" => 'str'}], ] res_codes = [] d.run(expect_records: 2) do events.each do |tag, t, record| body = record.map { |k, v| v }.to_csv res = post("/#{tag}?time=#{time_i.to_s}", body, {'Content-Type' => 'text/comma-separated-values'}) res_codes << res.code end end assert_equal ["200", "200"], res_codes assert_equal events, d.events assert_equal_event_time time, d.events[0][1] assert_equal_event_time time, d.events[1][1] end def test_response_with_empty_img d = create_driver(config) assert_equal true, d.instance.respond_with_empty_img time = event_time("2011-01-02 13:14:15 UTC") time_i = time.to_i events = [ ["tag1", time, {"a"=>1}], ["tag2", time, {"a"=>2}], ] res_codes = [] res_bodies = [] d.run do events.each do |tag, _t, record| res = post("/#{tag}", {"json"=>record.to_json, "time"=>time_i.to_s}) res_codes << res.code # Ruby returns ASCII-8 encoded string for GIF. res_bodies << res.body.force_encoding("UTF-8") end end assert_equal ["200", "200"], res_codes assert_equal [Fluent::Plugin::HttpInput::EMPTY_GIF_IMAGE, Fluent::Plugin::HttpInput::EMPTY_GIF_IMAGE], res_bodies assert_equal events, d.events assert_equal_event_time time, d.events[0][1] assert_equal_event_time time, d.events[1][1] end def test_response_without_empty_img d = create_driver(config + "respond_with_empty_img false") assert_equal false, d.instance.respond_with_empty_img time = event_time("2011-01-02 13:14:15 UTC") time_i = time.to_i events = [ ["tag1", time, {"a"=>1}], ["tag2", time, {"a"=>2}], ] res_codes = [] res_bodies = [] d.run do events.each do |tag, _t, record| res = post("/#{tag}", {"json"=>record.to_json, "time"=>time_i.to_s}) res_codes << res.code end end assert_equal ["200", "200"], res_codes assert_equal [], res_bodies assert_equal events, d.events assert_equal_event_time time, d.events[0][1] assert_equal_event_time time, d.events[1][1] end def test_response_use_204_response d = create_driver(config + %[ respond_with_empty_img false use_204_response true ]) assert_equal true, d.instance.use_204_response time = event_time("2011-01-02 13:14:15 UTC") time_i = time.to_i events = [ ["tag1", time, {"a"=>1}], ["tag2", time, {"a"=>2}], ] res_codes = [] res_bodies = [] d.run do events.each do |tag, _t, record| res = post("/#{tag}", {"json"=>record.to_json, "time"=>time_i.to_s}) res_codes << res.code end end assert_equal ["204", "204"], res_codes assert_equal [], res_bodies assert_equal events, d.events assert_equal_event_time time, d.events[0][1] assert_equal_event_time time, d.events[1][1] end def test_cors_allowed d = create_driver(config + "cors_allow_origins [\"http://foo.com\"]") assert_equal ["http://foo.com"], d.instance.cors_allow_origins time = event_time("2011-01-02 13:14:15 UTC") time_i = time.to_i events = [ ["tag1", time, {"a"=>1}], ["tag2", time, {"a"=>2}], ] res_codes = [] res_headers = [] d.run do events.each do |tag, _t, record| res = post("/#{tag}", {"json"=>record.to_json, "time"=>time_i.to_s}, {"Origin"=>"http://foo.com"}) res_codes << res.code res_headers << res["Access-Control-Allow-Origin"] end end assert_equal ["200", "200"], res_codes assert_equal ["http://foo.com", "http://foo.com"], res_headers assert_equal events, d.events assert_equal_event_time time, d.events[0][1] assert_equal_event_time time, d.events[1][1] end def test_cors_allowed_wildcard d = create_driver(config + 'cors_allow_origins ["*"]') time = event_time("2011-01-02 13:14:15 UTC") events = [ ["tag1", time, {"a"=>1}], ] d.run do events.each do |tag, time, record| headers = {"Origin" => "http://foo.com"} res = post("/#{tag}", {"json" => record.to_json, "time" => time.to_i}, headers) assert_equal "200", res.code assert_equal "*", res["Access-Control-Allow-Origin"] end end end def test_get_request d = create_driver(config) d.run do res = get("/cors.test", {}, {}) assert_equal "200", res.code end end def test_cors_preflight d = create_driver(config + 'cors_allow_origins ["*"]') d.run do header = { "Origin" => "http://foo.com", "Access-Control-Request-Method" => "POST", "Access-Control-Request-Headers" => "Content-Type", } res = options("/cors.test", {}, header) assert_equal "200", res.code assert_equal "*", res["Access-Control-Allow-Origin"] assert_equal "POST", res["Access-Control-Allow-Methods"] end end def test_cors_allowed_wildcard_for_subdomain d = create_driver(config + 'cors_allow_origins ["http://*.foo.com"]') time = event_time("2011-01-02 13:14:15 UTC") events = [ ["tag1", time, {"a"=>1}], ] d.run do events.each do |tag, time, record| headers = {"Origin" => "http://subdomain.foo.com"} res = post("/#{tag}", {"json" => record.to_json, "time" => time.to_i}, headers) assert_equal "200", res.code assert_equal "http://subdomain.foo.com", res["Access-Control-Allow-Origin"] end end end def test_cors_allowed_exclude_empty_string d = create_driver(config + 'cors_allow_origins ["", "http://*.foo.com"]') time = event_time("2011-01-02 13:14:15 UTC") events = [ ["tag1", time, {"a"=>1}], ] d.run do events.each do |tag, time, record| headers = {"Origin" => "http://subdomain.foo.com"} res = post("/#{tag}", {"json" => record.to_json, "time" => time.to_i}, headers) assert_equal "200", res.code assert_equal "http://subdomain.foo.com", res["Access-Control-Allow-Origin"] end end end def test_cors_allowed_wildcard_preflight_for_subdomain d = create_driver(config + 'cors_allow_origins ["http://*.foo.com"]') d.run do header = { "Origin" => "http://subdomain.foo.com", "Access-Control-Request-Method" => "POST", "Access-Control-Request-Headers" => "Content-Type", } res = options("/cors.test", {}, header) assert_equal "200", res.code assert_equal "http://subdomain.foo.com", res["Access-Control-Allow-Origin"] assert_equal "POST", res["Access-Control-Allow-Methods"] end end def test_cors_allow_credentials d = create_driver(config + %[ cors_allow_origins ["http://foo.com"] cors_allow_credentials ]) assert_equal true, d.instance.cors_allow_credentials time = event_time("2011-01-02 13:14:15 UTC") event = ["tag1", time, {"a"=>1}] res_code = nil res_header = nil d.run do res = post("/#{event[0]}", {"json"=>event[2].to_json, "time"=>time.to_i.to_s}, {"Origin"=>"http://foo.com"}) res_code = res.code res_header = res["Access-Control-Allow-Credentials"] end assert_equal( { response_code: "200", allow_credentials_header: "true", events: [event] }, { response_code: res_code, allow_credentials_header: res_header, events: d.events } ) end def test_cors_allow_credentials_for_wildcard_origins assert_raise(Fluent::ConfigError) do create_driver(config + %[ cors_allow_origins ["*"] cors_allow_credentials ]) end end def test_content_encoding_gzip d = create_driver time = event_time("2011-01-02 13:14:15 UTC") events = [ ["tag1", time, {"a"=>1}], ["tag2", time, {"a"=>2}], ] res_codes = [] d.run do events.each do |tag, time, record| header = {'Content-Type'=>'application/json', 'Content-Encoding'=>'gzip'} res = post("/#{tag}?time=#{time}", compress_gzip(record.to_json), header) res_codes << res.code end end assert_equal ["200", "200"], res_codes assert_equal events, d.events assert_equal_event_time time, d.events[0][1] assert_equal_event_time time, d.events[1][1] end def test_content_encoding_deflate d = create_driver time = event_time("2011-01-02 13:14:15 UTC") events = [ ["tag1", time, {"a"=>1}], ["tag2", time, {"a"=>2}], ] res_codes = [] d.run do events.each do |tag, time, record| header = {'Content-Type'=>'application/msgpack', 'Content-Encoding'=>'deflate'} res = post("/#{tag}?time=#{time}", Zlib.deflate(record.to_msgpack), header) res_codes << res.code end end assert_equal ["200", "200"], res_codes assert_equal events, d.events assert_equal_event_time time, d.events[0][1] assert_equal_event_time time, d.events[1][1] end def test_cors_disallowed d = create_driver(config + "cors_allow_origins [\"http://foo.com\"]") assert_equal ["http://foo.com"], d.instance.cors_allow_origins time = event_time("2011-01-02 13:14:15 UTC") time_i = time.to_i res_codes = [] d.end_if{ res_codes.size == 2 } d.run do res = post("/tag1", {"json"=>{"a"=>1}.to_json, "time"=>time_i.to_s}, {"Origin"=>"http://bar.com"}) res_codes << res.code res = post("/tag2", {"json"=>{"a"=>1}.to_json, "time"=>time_i.to_s}, {"Origin"=>"http://bar.com"}) res_codes << res.code end assert_equal ["403", "403"], res_codes end def test_add_query_params d = create_driver(config + "add_query_params true") assert_equal true, d.instance.add_query_params time = event_time("2011-01-02 13:14:15 UTC") time_i = time.to_i events = [ ["tag1", time, {"a"=>1, "QUERY_A"=>"b"}], ["tag2", time, {"a"=>2, "QUERY_A"=>"b"}], ] res_codes = [] res_bodies = [] d.run do events.each do |tag, _t, record| res = post("/#{tag}?a=b", {"json"=>record.to_json, "time"=>time_i.to_s}) res_codes << res.code end end assert_equal ["200", "200"], res_codes assert_equal [], res_bodies assert_equal events, d.events end $test_in_http_connection_object_ids = [] $test_in_http_content_types = [] $test_in_http_content_types_flag = false module ContentTypeHook def initialize(*args) @io_handler = nil super end def on_headers_complete(headers) super if $test_in_http_content_types_flag $test_in_http_content_types << self.content_type end end def on_message_begin super if $test_in_http_content_types_flag $test_in_http_connection_object_ids << @io_handler.object_id end end end class Fluent::Plugin::HttpInput::Handler prepend ContentTypeHook end def test_if_content_type_is_initialized_properly # This test is to check if Fluent::HttpInput::Handler's @content_type is initialized properly. # Especially when in Keep-Alive and the second request has no 'Content-Type'. begin d = create_driver $test_in_http_content_types_flag = true d.run do # Send two requests the second one has no Content-Type in Keep-Alive Net::HTTP.start("127.0.0.1", @port) do |http| req = Net::HTTP::Post.new("/foodb/bartbl", {"connection" => "keepalive", "Content-Type" => "application/json"}) http.request(req) req = Net::HTTP::Get.new("/foodb/bartbl", {"connection" => "keepalive"}) http.request(req) end end ensure $test_in_http_content_types_flag = false end assert_equal(['application/json', ''], $test_in_http_content_types) # Asserting keepalive assert_equal $test_in_http_connection_object_ids[0], $test_in_http_connection_object_ids[1] end def get(path, params, header = {}) http = Net::HTTP.new("127.0.0.1", @port) req = Net::HTTP::Get.new(path, header) http.request(req) end def options(path, params, header = {}) http = Net::HTTP.new("127.0.0.1", @port) req = Net::HTTP::Options.new(path, header) http.request(req) end def post(path, params, header = {}, &block) http = Net::HTTP.new("127.0.0.1", @port) req = Net::HTTP::Post.new(path, header) block.call(http, req) if block if params.is_a?(String) unless header.has_key?('Content-Type') header['Content-Type'] = 'application/octet-stream' end req.body = params else unless header.has_key?('Content-Type') header['Content-Type'] = 'application/x-www-form-urlencoded' end req.set_form_data(params) end http.request(req) end def compress_gzip(data) io = StringIO.new io.binmode Zlib::GzipWriter.wrap(io) { |gz| gz.write data } return io.string end def include_http_header?(record) record.keys.find { |header| header.start_with?('HTTP_') } end end