test/plugin/test_in_http.rb in fluentd-0.14.8 vs test/plugin/test_in_http.rb in fluentd-0.14.9

- old
+ new

@@ -1,9 +1,10 @@ require_relative '../helper' -require 'fluent/test' +require 'fluent/test/driver/input' require 'fluent/plugin/in_http' require 'net/http' +require 'timecop' class HttpInputTest < Test::Unit::TestCase class << self def startup socket_manager_path = ServerEngine::SocketManager::Server.generate_path @@ -18,21 +19,25 @@ def setup Fluent::Test.setup end + def teardown + Timecop.return + end + PORT = unused_port CONFIG = %[ port #{PORT} bind "127.0.0.1" body_size_limit 10m keepalive_timeout 5 respond_with_empty_img true ] def create_driver(conf=CONFIG) - Fluent::Test::InputTestDriver.new(Fluent::HttpInput).configure(conf, true) + Fluent::Test::Driver::Input.new(Fluent::Plugin::HttpInput).configure(conf) end def test_configure d = create_driver assert_equal PORT, d.instance.port @@ -42,329 +47,448 @@ assert_equal false, d.instance.add_http_headers end def test_time d = create_driver + time = event_time("2011-01-02 13:14:15.123 UTC") + Timecop.freeze(Time.at(time)) - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) - Fluent::Engine.now = time + events = [ + ["tag1", time, {"a" => 1}], + ["tag2", time, {"a" => 2}], + ] + res_codes = [] - d.expect_emit "tag1", time, {"a"=>1} - d.expect_emit "tag2", time, {"a"=>2} - - d.run do - d.expected_emits.each {|tag,_time,record| + d.run(expect_records: 2) do + events.each do |tag, _time, record| res = post("/#{tag}", {"json"=>record.to_json}) - assert_equal "200", res.code - } + res_codes << res.code + end end + + assert_equal ["200", "200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_time_as_float d = create_driver + time = event_time("2011-01-02 13:14:15.123 UTC") + float_time = time.to_f - float_time = Time.parse("2011-01-02 13:14:15.123 UTC").to_f - time = Fluent::EventTime.from_time(Time.at(float_time)) + events = [ + ["tag1", time, {"a"=>1}], + ] + res_codes = [] - d.expect_emit "tag1", time, {"a"=>1} - - d.run do - d.expected_emits.each {|tag,_time,record| + d.run(expect_records: 1) do + events.each do |tag, t, record| res = post("/#{tag}", {"json"=>record.to_json, "time"=>float_time.to_s}) - assert_equal "200", res.code - } + res_codes << res.code + end end + assert_equal ["200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] end def test_json d = create_driver + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) + events = [ + ["tag1", time_i, {"a"=>1}], + ["tag2", time_i, {"a"=>2}], + ] + res_codes = [] - d.expect_emit "tag1", time, {"a"=>1} - d.expect_emit "tag2", time, {"a"=>2} - - d.run do - d.expected_emits.each {|tag,_time,record| - res = post("/#{tag}", {"json"=>record.to_json, "time"=>_time.to_s}) - assert_equal "200", res.code - } + d.run(expect_records: 2) do + events.each do |tag, t, record| + res = post("/#{tag}", {"json"=>record.to_json, "time"=>t.to_s}) + res_codes << res.code + end end - - d.emit_streams.each { |tag, es| - assert !include_http_header?(es.first[1]) - } + assert_equal ["200", "200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_multi_json d = create_driver + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) - - events = [{"a"=>1},{"a"=>2}] + records = [{"a"=>1},{"a"=>2}] + events = [ + ["tag1", time_i, records[0]], + ["tag1", time_i, records[1]], + ] tag = "tag1" - - events.each { |ev| - d.expect_emit tag, time, ev - } - - d.run do - res = post("/#{tag}", {"json"=>events.to_json, "time"=>time.to_s}) - assert_equal "200", res.code + res_codes = [] + d.run(expect_records: 2, timeout: 5) do + res = post("/#{tag}", {"json"=>records.to_json, "time"=>time_i.to_s}) + res_codes << res.code end - + assert_equal ["200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_json_with_add_remote_addr d = create_driver(CONFIG + "add_remote_addr true") - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i - d.expect_emit "tag1", time, {"REMOTE_ADDR"=>"127.0.0.1", "a"=>1} - d.expect_emit "tag2", time, {"REMOTE_ADDR"=>"127.0.0.1", "a"=>2} - - d.run do - d.expected_emits.each {|tag,_time,record| - res = post("/#{tag}", {"json"=>record.to_json, "time"=>_time.to_s}) - assert_equal "200", res.code - } + events = [ + ["tag1", time, {"REMOTE_ADDR"=>"127.0.0.1", "a"=>1}], + ["tag2", time, {"REMOTE_ADDR"=>"127.0.0.1", "a"=>2}], + ] + res_codes = [] + d.run(expect_records: 2) do + events.each do |tag, _t, record| + res = post("/#{tag}", {"json"=>record.to_json, "time"=>time_i.to_s}) + res_codes << res.code + end end - + assert_equal ["200", "200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_multi_json_with_add_remote_addr d = create_driver(CONFIG + "add_remote_addr true") + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - events = [{"a"=>1},{"a"=>2}] + records = [{"a"=>1},{"a"=>2}] tag = "tag1" + res_codes = [] - d.expect_emit "tag1", time, {"REMOTE_ADDR"=>"127.0.0.1", "a"=>1} - d.expect_emit "tag1", time, {"REMOTE_ADDR"=>"127.0.0.1", "a"=>2} - - d.run do - res = post("/#{tag}", {"json"=>events.to_json, "time"=>time.to_s}) - assert_equal "200", res.code + d.run(expect_records: 2, timeout: 5) do + res = post("/#{tag}", {"json"=>records.to_json, "time"=>time_i.to_s}) + res_codes << res.code end + assert_equal ["200"], res_codes + assert_equal "tag1", d.events[0][0] + assert_equal_event_time time, d.events[0][1] + assert_equal 1, d.events[0][2]["a"] + assert{ d.events[0][2].has_key?("REMOTE_ADDR") && d.events[0][2]["REMOTE_ADDR"] =~ /^\d{1,4}(\.\d{1,4}){3}$/ } + + assert_equal "tag1", d.events[1][0] + assert_equal_event_time time, d.events[1][1] + assert_equal 2, d.events[1][2]["a"] end def test_json_with_add_remote_addr_given_x_forwarded_for d = create_driver(CONFIG + "add_remote_addr true") - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i - d.expect_emit "tag1", time, {"REMOTE_ADDR"=>"129.78.138.66", "a"=>1} - d.expect_emit "tag2", time, {"REMOTE_ADDR"=>"129.78.138.66", "a"=>1} + events = [ + ["tag1", time, {"a"=>1}], + ["tag2", time, {"a"=>2}], + ] + res_codes = [] - d.run do - d.expected_emits.each {|tag,_time,record| - res = post("/#{tag}", {"json"=>record.to_json, "time"=>_time.to_s}, {"X-Forwarded-For"=>"129.78.138.66, 127.0.0.1"}) - assert_equal "200", res.code - } + d.run(expect_records: 2) do + events.each do |tag, _t, record| + res = post("/#{tag}", {"json"=>record.to_json, "time"=>time_i.to_s}, {"X-Forwarded-For"=>"129.78.138.66, 127.0.0.1"}) + res_codes << res.code + end end + assert_equal ["200", "200"], res_codes + assert_equal "tag1", d.events[0][0] + assert_equal_event_time time, d.events[0][1] + assert_equal({"REMOTE_ADDR"=>"129.78.138.66", "a"=>1}, d.events[0][2]) + + assert_equal "tag2", d.events[1][0] + assert_equal_event_time time, d.events[1][1] + assert_equal({"REMOTE_ADDR"=>"129.78.138.66", "a"=>2}, d.events[1][2]) end def test_multi_json_with_add_remote_addr_given_x_forwarded_for d = create_driver(CONFIG + "add_remote_addr true") - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - events = [{"a"=>1},{"a"=>2}] + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + records = [{"a"=>1},{"a"=>2}] + events = [ + ["tag1", time, {"REMOTE_ADDR"=>"129.78.138.66", "a"=>1}], + ["tag1", time, {"REMOTE_ADDR"=>"129.78.138.66", "a"=>2}], + ] tag = "tag1" + res_codes = [] - d.expect_emit "tag1", time, {"REMOTE_ADDR"=>"129.78.138.66", "a"=>1} - d.expect_emit "tag1", time, {"REMOTE_ADDR"=>"129.78.138.66", "a"=>2} - - d.run do - res = post("/#{tag}", {"json"=>events.to_json, "time"=>time.to_s}, {"X-Forwarded-For"=>"129.78.138.66, 127.0.0.1"}) - assert_equal "200", res.code + d.run(expect_records: 2, timeout: 5) do + res = post("/#{tag}", {"json"=>records.to_json, "time"=>time_i.to_s}, {"X-Forwarded-For"=>"129.78.138.66, 127.0.0.1"}) + res_codes << res.code end - + assert_equal ["200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_multi_json_with_add_http_headers d = create_driver(CONFIG + "add_http_headers true") - - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - events = [{"a"=>1},{"a"=>2}] + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + records = [{"a"=>1},{"a"=>2}] tag = "tag1" + res_codes = [] - d.run do - res = post("/#{tag}", {"json"=>events.to_json, "time"=>time.to_s}) - assert_equal "200", res.code + d.run(expect_records: 2, timeout: 5) do + res = post("/#{tag}", {"json"=>records.to_json, "time"=>time_i.to_s}) + res_codes << res.code end + assert_equal ["200"], res_codes - d.emit_streams.each { |_tag, es| - assert include_http_header?(es.first[1]) - } + assert_equal "tag1", d.events[0][0] + assert_equal_event_time time, d.events[0][1] + assert_equal 1, d.events[0][2]["a"] + + assert_equal "tag1", d.events[1][0] + assert_equal_event_time time, d.events[1][1] + assert_equal 2, d.events[1][2]["a"] + + assert include_http_header?(d.events[0][2]) + assert include_http_header?(d.events[1][2]) end def test_json_with_add_http_headers d = create_driver(CONFIG + "add_http_headers true") + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + events = [ + ["tag1", time, {"a"=>1}], + ["tag2", time, {"a"=>2}], + ] + res_codes = [] - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) + d.run(expect_records: 2) do + events.each do |tag, t, record| + res = post("/#{tag}", {"json"=>record.to_json, "time"=>time_i.to_s}) + res_codes << res.code + end + end + assert_equal ["200", "200"], res_codes - records = [["tag1", time, {"a"=>1}], ["tag2", time, {"a"=>2}]] + assert_equal "tag1", d.events[0][0] + assert_equal_event_time time, d.events[0][1] + assert_equal 1, d.events[0][2]["a"] - d.run do - records.each {|tag,_time,record| - res = post("/#{tag}", {"json"=>record.to_json, "time"=>_time.to_s}) - assert_equal "200", res.code + assert_equal "tag2", d.events[1][0] + assert_equal_event_time time, d.events[1][1] + assert_equal 2, d.events[1][2]["a"] - } - end - - d.emit_streams.each { |tag, es| - assert include_http_header?(es.first[1]) - } + assert include_http_header?(d.events[0][2]) + assert include_http_header?(d.events[1][2]) end def test_application_json d = create_driver + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + events = [ + ["tag1", time, {"a"=>1}], + ["tag2", time, {"a"=>2}], + ] + res_codes = [] - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) - - d.expect_emit "tag1", time, {"a"=>1} - d.expect_emit "tag2", time, {"a"=>2} - - d.run do - d.expected_emits.each {|tag,_time,record| - res = post("/#{tag}?time=#{_time.to_s}", record.to_json, {"Content-Type"=>"application/json; charset=utf-8"}) - assert_equal "200", res.code - } + d.run(expect_records: 2) do + events.each do |tag, t, record| + res = post("/#{tag}?time=#{time_i.to_s}", record.to_json, {"Content-Type"=>"application/json; charset=utf-8"}) + res_codes << res.code + end end + assert_equal ["200", "200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_msgpack d = create_driver + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) + events = [ + ["tag1", time, {"a"=>1}], + ["tag2", time, {"a"=>2}], + ] + res_codes = [] - d.expect_emit "tag1", time, {"a"=>1} - d.expect_emit "tag2", time, {"a"=>2} - - d.run do - d.expected_emits.each {|tag,_time,record| - res = post("/#{tag}", {"msgpack"=>record.to_msgpack, "time"=>_time.to_s}) - assert_equal "200", res.code - } + d.run(expect_records: 2) do + events.each do |tag, t, record| + res = post("/#{tag}", {"msgpack"=>record.to_msgpack, "time"=>time_i.to_s}) + res_codes << res.code + end end + assert_equal ["200", "200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_multi_msgpack d = create_driver - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i - events = [{"a"=>1},{"a"=>2}] + records = [{"a"=>1},{"a"=>2}] + events = [ + ["tag1", time, records[0]], + ["tag1", time, records[1]], + ] tag = "tag1" - - events.each { |ev| - d.expect_emit tag, time, ev - } - - d.run do - res = post("/#{tag}", {"msgpack"=>events.to_msgpack, "time"=>time.to_s}) - assert_equal "200", res.code + res_codes = [] + d.run(expect_records: 2) do + res = post("/#{tag}", {"msgpack"=>records.to_msgpack, "time"=>time_i.to_s}) + res_codes << res.code end - + assert_equal ["200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_with_regexp d = create_driver(CONFIG + %[ format /^(?<field_1>\\d+):(?<field_2>\\w+)$/ types field_1:integer ]) - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + events = [ + ["tag1", time, {"field_1" => 1, "field_2" => 'str'}], + ["tag2", time, {"field_1" => 2, "field_2" => 'str'}], + ] + res_codes = [] - d.expect_emit "tag1", time, {"field_1" => 1, "field_2" => 'str'} - d.expect_emit "tag2", time, {"field_1" => 2, "field_2" => 'str'} - - d.run do - d.expected_emits.each { |tag, _time, record| + d.run(expect_records: 2) do + events.each do |tag, t, record| body = record.map { |k, v| v.to_s }.join(':') - res = post("/#{tag}?time=#{_time.to_s}", body, {'Content-Type' => 'application/octet-stream'}) - assert_equal "200", res.code - } + res = post("/#{tag}?time=#{time_i.to_s}", body, {'Content-Type' => 'application/octet-stream'}) + res_codes << res.code + end end + assert_equal ["200", "200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_with_csv require 'csv' d = create_driver(CONFIG + %[ format csv keys foo,bar ]) + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + events = [ + ["tag1", time, {"foo" => "1", "bar" => 'st"r'}], + ["tag2", time, {"foo" => "2", "bar" => 'str'}], + ] + res_codes = [] - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) - - d.expect_emit "tag1", time, {"foo" => "1", "bar" => 'st"r'} - d.expect_emit "tag2", time, {"foo" => "2", "bar" => 'str'} - - d.run do - d.expected_emits.each { |tag, _time, record| + d.run(expect_records: 2) do + events.each do |tag, t, record| body = record.map { |k, v| v }.to_csv - res = post("/#{tag}?time=#{_time.to_s}", body, {'Content-Type' => 'text/comma-separated-values'}) - assert_equal "200", res.code - } + res = post("/#{tag}?time=#{time_i.to_s}", body, {'Content-Type' => 'text/comma-separated-values'}) + res_codes << res.code + end end + assert_equal ["200", "200"], res_codes + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_resonse_with_empty_img d = create_driver(CONFIG + "respond_with_empty_img true") assert_equal true, d.instance.respond_with_empty_img - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + events = [ + ["tag1", time, {"a"=>1}], + ["tag2", time, {"a"=>2}], + ] + res_codes = [] + res_bodies = [] - d.expect_emit "tag1", time, {"a"=>1} - d.expect_emit "tag2", time, {"a"=>2} - d.run do - d.expected_emits.each {|tag,_time,record| - res = post("/#{tag}", {"json"=>record.to_json, "time"=>_time.to_s}) - assert_equal "200", res.code + events.each do |tag, _t, record| + res = post("/#{tag}", {"json"=>record.to_json, "time"=>time_i.to_s}) + res_codes << res.code # Ruby returns ASCII-8 encoded string for GIF. - assert_equal Fluent::HttpInput::EMPTY_GIF_IMAGE, res.body.force_encoding("UTF-8") - } + res_bodies << res.body.force_encoding("UTF-8") + end end + assert_equal ["200", "200"], res_codes + assert_equal [Fluent::Plugin::HttpInput::EMPTY_GIF_IMAGE, Fluent::Plugin::HttpInput::EMPTY_GIF_IMAGE], res_bodies + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_cors_allowed d = create_driver(CONFIG + "cors_allow_origins [\"http://foo.com\"]") assert_equal ["http://foo.com"], d.instance.cors_allow_origins - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + events = [ + ["tag1", time, {"a"=>1}], + ["tag2", time, {"a"=>2}], + ] + res_codes = [] + res_headers = [] - d.expect_emit "tag1", time, {"a"=>1} - d.expect_emit "tag2", time, {"a"=>1} - d.run do - d.expected_emits.each {|tag,_time,record| - res = post("/#{tag}", {"json"=>record.to_json, "time"=>_time.to_s}, {"Origin"=>"http://foo.com"}) - assert_equal "200", res.code - assert_equal "http://foo.com", res["Access-Control-Allow-Origin"] - } + events.each do |tag, _t, record| + res = post("/#{tag}", {"json"=>record.to_json, "time"=>time_i.to_s}, {"Origin"=>"http://foo.com"}) + res_codes << res.code + res_headers << res["Access-Control-Allow-Origin"] + end end + assert_equal ["200", "200"], res_codes + assert_equal ["http://foo.com", "http://foo.com"], res_headers + assert_equal events, d.events + assert_equal_event_time time, d.events[0][1] + assert_equal_event_time time, d.events[1][1] end def test_cors_disallowed d = create_driver(CONFIG + "cors_allow_origins [\"http://foo.com\"]") assert_equal ["http://foo.com"], d.instance.cors_allow_origins - time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) + time = event_time("2011-01-02 13:14:15 UTC") + time_i = time.to_i + res_codes = [] - d.expected_emits_length = 0 + d.end_if{ res_codes.size == 2 } d.run do - res = post("/tag1", {"json"=>{"a"=>1}.to_json, "time"=>time.to_s}, {"Origin"=>"http://bar.com"}) - assert_equal "403", res.code - - res = post("/tag2", {"json"=>{"a"=>1}.to_json, "time"=>time.to_s}, {"Origin"=>"http://bar.com"}) - assert_equal "403", res.code + res = post("/tag1", {"json"=>{"a"=>1}.to_json, "time"=>time_i.to_s}, {"Origin"=>"http://bar.com"}) + res_codes << res.code + res = post("/tag2", {"json"=>{"a"=>1}.to_json, "time"=>time_i.to_s}, {"Origin"=>"http://bar.com"}) + res_codes << res.code end + assert_equal ["403", "403"], res_codes end $test_in_http_connection_object_ids = [] $test_in_http_content_types = [] $test_in_http_content_types_flag = false @@ -386,10 +510,10 @@ $test_in_http_connection_object_ids << @io_handler.object_id end end end - class Fluent::HttpInput::Handler + class Fluent::Plugin::HttpInput::Handler prepend ContentTypeHook end def test_if_content_type_is_initialized_properly # This test is to check if Fluent::HttpInput::Handler's @content_type is initialized properly.