test/test_out_s3.rb in fluent-plugin-s3-0.8.8 vs test/test_out_s3.rb in fluent-plugin-s3-1.0.0.rc1

- old
+ new

@@ -1,17 +1,23 @@ require 'fluent/test' +require 'fluent/test/helpers' +require 'fluent/test/log' +require 'fluent/test/driver/output' +require 'aws-sdk-resources' require 'fluent/plugin/out_s3' require 'test/unit/rr' require 'zlib' require 'fileutils' require 'timecop' +require 'uuidtools' +include Fluent::Test::Helpers + class S3OutputTest < Test::Unit::TestCase def setup - require 'aws-sdk-resources' - Fluent::Test.setup +# Fluent::Test.setup end def teardown Dir.glob('test/tmp/*').each {|file| FileUtils.rm_f(file) } end @@ -21,14 +27,19 @@ aws_sec_key test_sec_key s3_bucket test_bucket path log utc buffer_type memory + time_slice_format %Y%m%d-%H ] def create_driver(conf = CONFIG) - Fluent::Test::TimeSlicedOutputTestDriver.new(Fluent::S3Output) do + Fluent::Test::Driver::Output.new(Fluent::Plugin::S3Output) do + def format(tag, time, record) + super + end + def write(chunk) chunk.read end private @@ -64,11 +75,11 @@ data('US West (Oregon)' => 's3-us-west-2.amazonaws.com', 'EU (Frankfurt)' => 's3.eu-central-1.amazonaws.com', 'Asia Pacific (Tokyo)' => 's3-ap-northeast-1.amazonaws.com') def test_s3_endpoint_with_invalid_endpoint(endpoint) assert_raise(Fluent::ConfigError, "s3_endpoint parameter is not supported, use s3_region instead. This parameter is for S3 compatible services") { - d = create_driver(CONFIG + "s3_endpoint #{endpoint}") + create_driver(CONFIG + "s3_endpoint #{endpoint}") } end def test_configure_with_mime_type_json conf = CONFIG.clone @@ -127,186 +138,150 @@ d = create_driver(conf) assert_equal false, d.instance.check_bucket assert_equal false, d.instance.check_object end - def test_config_with_hostname_placeholder - d = create_driver(<<EOC) - aws_key_id test_key_id - aws_sec_key test_sec_key - s3_bucket test_bucket - path log/%{hostname}/test - s3_object_key_format %{path}%{hostname}_%{index} - buffer_type memory -EOC - assert_match /#{Socket.gethostname}/, d.instance.s3_object_key_format - assert_match /#{Socket.gethostname}/, d.instance.path - end - - def test_configure_with_grant - conf = CONFIG.clone - conf << "\grant_full_control id='0123456789'\ngrant_read id='1234567890'\ngrant_read_acp id='2345678901'\ngrant_write_acp id='3456789012'\n" - d = create_driver(conf) - assert_equal "id='0123456789'", d.instance.grant_full_control - assert_equal "id='1234567890'", d.instance.grant_read - assert_equal "id='2345678901'", d.instance.grant_read_acp - assert_equal "id='3456789012'", d.instance.grant_write_acp - end - - def test_path_slicing - config = CONFIG.clone.gsub(/path\slog/, "path log/%Y/%m/%d") - d = create_driver(config) - path_slicer = d.instance.instance_variable_get(:@path_slicer) - path = d.instance.instance_variable_get(:@path) - slice = path_slicer.call(path) - assert_equal slice, Time.now.utc.strftime("log/%Y/%m/%d") - end - - def test_path_slicing_utc - config = CONFIG.clone.gsub(/path\slog/, "path log/%Y/%m/%d") - config << "\nutc\n" - d = create_driver(config) - path_slicer = d.instance.instance_variable_get(:@path_slicer) - path = d.instance.instance_variable_get(:@path) - slice = path_slicer.call(path) - assert_equal slice, Time.now.utc.strftime("log/%Y/%m/%d") - end - def test_format d = create_driver - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - d.emit({"a"=>1}, time) - d.emit({"a"=>2}, time) - - d.expect_format %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] - d.expect_format %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n] - - d.run + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: "test") do + d.feed(time, { "a" => 1 }) + d.feed(time, { "a" => 2 }) + end + expected = [ + %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n], + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n] + ] + assert_equal(expected, d.formatted) end def test_format_included_tag_and_time config = [CONFIG, 'include_tag_key true', 'include_time_key true'].join("\n") d = create_driver(config) - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - d.emit({"a"=>1}, time) - d.emit({"a"=>2}, time) - - d.expect_format %[2011-01-02T13:14:15Z\ttest\t{"a":1,"tag":"test","time":"2011-01-02T13:14:15Z"}\n] - d.expect_format %[2011-01-02T13:14:15Z\ttest\t{"a":2,"tag":"test","time":"2011-01-02T13:14:15Z"}\n] - - d.run + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: "test") do + d.feed(time, { "a" => 1 }) + d.feed(time, { "a" => 2 }) + end + expected = [ + %[2011-01-02T13:14:15Z\ttest\t{"a":1,"tag":"test","time":"2011-01-02T13:14:15Z"}\n], + %[2011-01-02T13:14:15Z\ttest\t{"a":2,"tag":"test","time":"2011-01-02T13:14:15Z"}\n] + ] + assert_equal(expected, d.formatted) end def test_format_with_format_ltsv config = [CONFIG, 'format ltsv'].join("\n") d = create_driver(config) - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - d.emit({"a"=>1, "b"=>1}, time) - d.emit({"a"=>2, "b"=>2}, time) - - d.expect_format %[a:1\tb:1\n] - d.expect_format %[a:2\tb:2\n] - - d.run + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: "test") do + d.feed(time, {"a"=>1, "b"=>1}) + d.feed(time, {"a"=>2, "b"=>2}) + end + expected = [ + %[a:1\tb:1\n], + %[a:2\tb:2\n] + ] + assert_equal(expected, d.formatted) end def test_format_with_format_json config = [CONFIG, 'format json'].join("\n") d = create_driver(config) - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - d.emit({"a"=>1}, time) - d.emit({"a"=>2}, time) - - d.expect_format %[{"a":1}\n] - d.expect_format %[{"a":2}\n] - - d.run + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: "test") do + d.feed(time, {"a"=>1}) + d.feed(time, {"a"=>2}) + end + expected = [ + %[{"a":1}\n], + %[{"a":2}\n] + ] + assert_equal(expected, d.formatted) end def test_format_with_format_json_included_tag config = [CONFIG, 'format json', 'include_tag_key true'].join("\n") d = create_driver(config) - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - d.emit({"a"=>1}, time) - d.emit({"a"=>2}, time) - - d.expect_format %[{"a":1,"tag":"test"}\n] - d.expect_format %[{"a":2,"tag":"test"}\n] - - d.run + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: "test") do + d.feed(time, {"a"=>1}) + d.feed(time, {"a"=>2}) + end + expected = [ + %[{"a":1,"tag":"test"}\n], + %[{"a":2,"tag":"test"}\n] + ] + assert_equal(expected, d.formatted) end def test_format_with_format_json_included_time config = [CONFIG, 'format json', 'include_time_key true'].join("\n") d = create_driver(config) - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - d.emit({"a"=>1}, time) - d.emit({"a"=>2}, time) - - d.expect_format %[{"a":1,"time":"2011-01-02T13:14:15Z"}\n] - d.expect_format %[{"a":2,"time":"2011-01-02T13:14:15Z"}\n] - - d.run + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: "test") do + d.feed(time, {"a"=>1}) + d.feed(time, {"a"=>2}) + end + expected = [ + %[{"a":1,"time":"2011-01-02T13:14:15Z"}\n], + %[{"a":2,"time":"2011-01-02T13:14:15Z"}\n] + ] + assert_equal(expected, d.formatted) end def test_format_with_format_json_included_tag_and_time config = [CONFIG, 'format json', 'include_tag_key true', 'include_time_key true'].join("\n") d = create_driver(config) - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - d.emit({"a"=>1}, time) - d.emit({"a"=>2}, time) - - d.expect_format %[{"a":1,"tag":"test","time":"2011-01-02T13:14:15Z"}\n] - d.expect_format %[{"a":2,"tag":"test","time":"2011-01-02T13:14:15Z"}\n] - - d.run + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: "test") do + d.feed(time, {"a"=>1}) + d.feed(time, {"a"=>2}) + end + expected = [ + %[{"a":1,"tag":"test","time":"2011-01-02T13:14:15Z"}\n], + %[{"a":2,"tag":"test","time":"2011-01-02T13:14:15Z"}\n] + ] + assert_equal(expected, d.formatted) end - def test_chunk_to_write - d = create_driver - - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - d.emit({"a"=>1}, time) - d.emit({"a"=>2}, time) - - # S3OutputTest#write returns chunk.read - data = d.run - - assert_equal %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + - %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n], - data.first - end - CONFIG_TIME_SLICE = %[ aws_key_id test_key_id aws_sec_key test_sec_key s3_bucket test_bucket s3_object_key_format %{path}/events/ts=%{time_slice}/events_%{index}-%{hostname}.%{file_extension} time_slice_format %Y%m%d-%H path log utc buffer_type memory - log_level debug + @log_level debug check_bucket true check_object true ] def create_time_sliced_driver(conf = CONFIG_TIME_SLICE) - d = Fluent::Test::TimeSlicedOutputTestDriver.new(Fluent::S3Output) do + Fluent::Test::Driver::Output.new(Fluent::Plugin::S3Output) do + def format(tag, time, record) + super + end + + def write(chunk) + super + end + private def check_apikeys end end.configure(conf) - d end def test_write_with_hardened_s3_policy # Partial mock the S3Bucket, not to make an actual connection to Amazon S3 setup_mocks_hardened_policy @@ -321,16 +296,16 @@ # to make assertions on chunks' keys config = CONFIG_TIME_SLICE.gsub(/check_object true/, "check_object false\n") config = config.gsub(/check_bucket true/, "check_bucket false\n") d = create_time_sliced_driver(config) - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - d.emit({"a"=>1}, time) - d.emit({"a"=>2}, time) + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: "test") do + d.feed(time, {"a"=>1}) + d.feed(time, {"a"=>2}) + end - # Finally, the instance of S3Output is initialized and then invoked - d.run Zlib::GzipReader.open(s3_local_file_path) do |gz| data = gz.read assert_equal %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n], data @@ -342,20 +317,18 @@ # Partial mock the S3Bucket, not to make an actual connection to Amazon S3 setup_mocks(true) s3_local_file_path = "/tmp/s3-test.txt" setup_s3_object_mocks(s3_local_file_path: s3_local_file_path) - # We must use TimeSlicedOutputTestDriver instead of BufferedOutputTestDriver, - # to make assertions on chunks' keys d = create_time_sliced_driver - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - d.emit({"a"=>1}, time) - d.emit({"a"=>2}, time) + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: "test") do + d.feed(time, {"a"=>1}) + d.feed(time, {"a"=>2}) + end - # Finally, the instance of S3Output is initialized and then invoked - d.run Zlib::GzipReader.open(s3_local_file_path) do |gz| data = gz.read assert_equal %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n], data @@ -379,21 +352,19 @@ s3_local_file_path = "/tmp/s3-test.txt" s3path = "log/events/ts=20110102-13/events_0-#{uuid}.gz" setup_s3_object_mocks(s3_local_file_path: s3_local_file_path, s3path: s3path) - # We must use TimeSlicedOutputTestDriver instead of BufferedOutputTestDriver, - # to make assertions on chunks' keys config = CONFIG_TIME_SLICE.gsub(/%{hostname}/,"%{uuid_flush}") d = create_time_sliced_driver(config) - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - d.emit({"a"=>1}, time) - d.emit({"a"=>2}, time) + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: "test") do + d.feed(time, {"a"=>1}) + d.feed(time, {"a"=>2}) + end - # Finally, the instance of S3Output is initialized and then invoked - d.run Zlib::GzipReader.open(s3_local_file_path) do |gz| data = gz.read assert_equal %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n], data @@ -417,57 +388,58 @@ s3path = "log/events/ts=20110102-13/events_0-#{hex_random}.gz" s3_local_file_path = "/tmp/s3-test.txt" setup_s3_object_mocks(s3_local_file_path: s3_local_file_path, s3path: s3path) d = create_time_sliced_driver(config) - stub(d.instance).unique_hex { unique_hex } + stub(Fluent::UniqueId).hex(anything) { unique_hex } - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - d.emit({"a"=>1}, time) - d.emit({"a"=>2}, time) + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: "test") do + d.feed(time, {"a"=>1}) + d.feed(time, {"a"=>2}) + end - # Finally, the instance of S3Output is initialized and then invoked - d.run Zlib::GzipReader.open(s3_local_file_path) do |gz| data = gz.read assert_equal %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n], data end FileUtils.rm_f(s3_local_file_path) end def setup_mocks(exists_return = false) - @s3_client = stub(Aws::S3::Client.new(:stub_responses => true)) + @s3_client = stub(Aws::S3::Client.new(stub_responses: true)) mock(Aws::S3::Client).new(anything).at_least(0) { @s3_client } - @s3_resource = mock(Aws::S3::Resource.new(:client => @s3_client)) - mock(Aws::S3::Resource).new(:client => @s3_client) { @s3_resource } - @s3_bucket = mock(Aws::S3::Bucket.new(:name => "test", - :client => @s3_client)) + @s3_resource = mock(Aws::S3::Resource.new(client: @s3_client)) + mock(Aws::S3::Resource).new(client: @s3_client) { @s3_resource } + @s3_bucket = mock(Aws::S3::Bucket.new(name: "test", + client: @s3_client)) @s3_bucket.exists? { exists_return } - @s3_object = mock(Aws::S3::Object.new(:bucket_name => "test_bucket", - :key => "test", - :client => @s3_client)) + @s3_object = mock(Aws::S3::Object.new(bucket_name: "test_bucket", + key: "test", + client: @s3_client)) + @s3_object.exists?.at_least(0) { false } @s3_bucket.object(anything).at_least(0) { @s3_object } @s3_resource.bucket(anything) { @s3_bucket } end def setup_s3_object_mocks(params = {}) s3path = params[:s3path] || "log/events/ts=20110102-13/events_0-#{Socket.gethostname}.gz" s3_local_file_path = params[:s3_local_file_path] || "/tmp/s3-test.txt" # Assert content of event logs which are being sent to S3 - s3obj = stub(Aws::S3::Object.new(:bucket_name => "test_bucket", - :key => "test", - :client => @s3_client)) + s3obj = stub(Aws::S3::Object.new(bucket_name: "test_bucket", + key: "test", + client: @s3_client)) s3obj.exists? { false } tempfile = File.new(s3_local_file_path, "w") - stub(Tempfile).new("s3-", nil) { tempfile } - s3obj.put(:body => tempfile, - :content_type => "application/x-gzip", - :storage_class => "STANDARD") + stub(Tempfile).new("s3-") { tempfile } + s3obj.put(body: tempfile, + content_type: "application/x-gzip", + storage_class: "STANDARD") @s3_bucket.object(s3path) { s3obj } end def setup_mocks_hardened_policy() @@ -483,20 +455,19 @@ @s3_bucket.object(anything).at_least(0) { @s3_object } @s3_resource.bucket(anything) { @s3_bucket } end def setup_s3_object_mocks_hardened_policy(params = {}) - s3path = params[:s3path] || "log/20110102_131415.gz" s3_local_file_path = params[:s3_local_file_path] || "/tmp/s3-test.txt" # Assert content of event logs which are being sent to S3 s3obj = stub(Aws::S3::Object.new(:bucket_name => "test_bucket", :key => "test", :client => @s3_client)) tempfile = File.new(s3_local_file_path, "w") - stub(Tempfile).new("s3-", nil) { tempfile } + stub(Tempfile).new("s3-") { tempfile } s3obj.put(:body => tempfile, :content_type => "application/x-gzip", :storage_class => "STANDARD") end @@ -504,58 +475,59 @@ setup_mocks config = CONFIG_TIME_SLICE + 'auto_create_bucket false' d = create_time_sliced_driver(config) assert_raise(RuntimeError, "The specified bucket does not exist: bucket = test_bucket") { - d.run + d.run {} } end def test_auto_create_bucket_true_with_non_existence_bucket setup_mocks - @s3_resource.create_bucket(:bucket => "test_bucket") + @s3_resource.create_bucket(bucket: "test_bucket") config = CONFIG_TIME_SLICE + 'auto_create_bucket true' d = create_time_sliced_driver(config) - assert_nothing_raised { d.run } + assert_nothing_raised { d.run {} } end def test_credentials d = create_time_sliced_driver - assert_nothing_raised{ d.run } + assert_nothing_raised { d.run {} } client = d.instance.instance_variable_get(:@s3).client credentials = client.config.credentials assert_instance_of(Aws::Credentials, credentials) end def test_assume_role_credentials expected_credentials = Aws::Credentials.new("test_key", "test_secret") - mock(Aws::AssumeRoleCredentials).new(:role_arn => "test_arn", - :role_session_name => "test_session"){ + mock(Aws::AssumeRoleCredentials).new(role_arn: "test_arn", + role_session_name: "test_session", + client: anything){ expected_credentials } config = CONFIG_TIME_SLICE.split("\n").reject{|x| x =~ /.+aws_.+/}.join("\n") config += %[ <assume_role_credentials> role_arn test_arn role_session_name test_session </assume_role_credentials> ] d = create_time_sliced_driver(config) - assert_nothing_raised{ d.run } + assert_nothing_raised { d.run {} } client = d.instance.instance_variable_get(:@s3).client credentials = client.config.credentials assert_equal(expected_credentials, credentials) end def test_assume_role_credentials_with_region expected_credentials = Aws::Credentials.new("test_key", "test_secret") - sts_client = Aws::STS::Client.new(:region => 'ap-northeast-1') - mock(Aws::STS::Client).new(:region => 'ap-northeast-1'){ sts_client } - mock(Aws::AssumeRoleCredentials).new(:role_arn => "test_arn", - :role_session_name => "test_session", - :client => sts_client){ + sts_client = Aws::STS::Client.new(region: 'ap-northeast-1') + mock(Aws::STS::Client).new(region: 'ap-northeast-1'){ sts_client } + mock(Aws::AssumeRoleCredentials).new(role_arn: "test_arn", + role_session_name: "test_session", + client: sts_client){ expected_credentials } config = CONFIG_TIME_SLICE.split("\n").reject{|x| x =~ /.+aws_.+/}.join("\n") config += %[ s3_region ap-northeast-1 @@ -563,11 +535,11 @@ role_arn test_arn role_session_name test_session </assume_role_credentials> ] d = create_time_sliced_driver(config) - assert_nothing_raised{ d.run } + assert_nothing_raised { d.run {} } client = d.instance.instance_variable_get(:@s3).client credentials = client.config.credentials assert_equal(expected_credentials, credentials) end @@ -578,11 +550,11 @@ config += %[ <instance_profile_credentials> </instance_profile_credentials> ] d = create_time_sliced_driver(config) - assert_nothing_raised{ d.run } + assert_nothing_raised { d.run {} } client = d.instance.instance_variable_get(:@s3).client credentials = client.config.credentials assert_equal(expected_credentials, credentials) end @@ -595,27 +567,27 @@ config += %[ <instance_profile_credentials> </instance_profile_credentials> ] d = create_time_sliced_driver(config) - assert_nothing_raised{ d.run } + assert_nothing_raised { d.run {} } client = d.instance.instance_variable_get(:@s3).client credentials = client.config.credentials assert_equal(expected_credentials, credentials) ENV["AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"] = nil end def test_instance_profile_credentials_aws_iam_retries expected_credentials = Aws::Credentials.new("test_key", "test_secret") - mock(Aws::InstanceProfileCredentials).new({}).returns(expected_credentials) + mock(Aws::InstanceProfileCredentials).new({ retries: 10 }).returns(expected_credentials) config = CONFIG_TIME_SLICE.split("\n").reject{|x| x =~ /.+aws_.+/}.join("\n") config += %[ aws_iam_retries 10 ] d = create_time_sliced_driver(config) - assert_nothing_raised{ d.run } + assert_nothing_raised { d.run {} } client = d.instance.instance_variable_get(:@s3).client credentials = client.config.credentials assert_equal(expected_credentials, credentials) end @@ -626,11 +598,11 @@ config += %[ <shared_credentials> </shared_credentials> ] d = create_time_sliced_driver(config) - assert_nothing_raised{ d.run } + assert_nothing_raised { d.run {} } client = d.instance.instance_variable_get(:@s3).client credentials = client.config.credentials assert_equal(expected_credentials, credentials) end @@ -648,21 +620,21 @@ setup_s3_object_mocks(s3_local_file_path: s3_local_file_path) config = CONFIG_TIME_SLICE + 'warn_for_delay 1d' d = create_time_sliced_driver(config) - delayed_time = Time.parse("2011-01-02 13:14:15 UTC") - now = delayed_time + 86000 + 1 - Timecop.freeze(now) - - d.emit({"a"=>1}, delayed_time.to_i) - d.emit({"a"=>2}, delayed_time.to_i) - - d.run - - logs = d.instance.log.logs + delayed_time = event_time("2011-01-02 13:14:15 UTC") + now = delayed_time.to_i + 86000 + 1 + d.instance.log.out.flush_logs = false + Timecop.freeze(Time.at(now)) do + d.run(default_tag: "test") do + d.feed(delayed_time, {"a"=>1}) + d.feed(delayed_time, {"a"=>2}) + end + end + logs = d.instance.log.out.logs assert_true logs.any? {|log| log.include?('out_s3: delayed events were put') } - - Timecop.return + d.instance.log.out.flush_logs = true + d.instance.log.out.reset FileUtils.rm_f(s3_local_file_path) end end