test/test_out_s3.rb in fluent-plugin-s3-0.8.8 vs test/test_out_s3.rb in fluent-plugin-s3-1.0.0.rc1
- old
+ new
@@ -1,17 +1,23 @@
require 'fluent/test'
+require 'fluent/test/helpers'
+require 'fluent/test/log'
+require 'fluent/test/driver/output'
+require 'aws-sdk-resources'
require 'fluent/plugin/out_s3'
require 'test/unit/rr'
require 'zlib'
require 'fileutils'
require 'timecop'
+require 'uuidtools'
+include Fluent::Test::Helpers
+
class S3OutputTest < Test::Unit::TestCase
def setup
- require 'aws-sdk-resources'
- Fluent::Test.setup
+# Fluent::Test.setup
end
def teardown
Dir.glob('test/tmp/*').each {|file| FileUtils.rm_f(file) }
end
@@ -21,14 +27,19 @@
aws_sec_key test_sec_key
s3_bucket test_bucket
path log
utc
buffer_type memory
+ time_slice_format %Y%m%d-%H
]
def create_driver(conf = CONFIG)
- Fluent::Test::TimeSlicedOutputTestDriver.new(Fluent::S3Output) do
+ Fluent::Test::Driver::Output.new(Fluent::Plugin::S3Output) do
+ def format(tag, time, record)
+ super
+ end
+
def write(chunk)
chunk.read
end
private
@@ -64,11 +75,11 @@
data('US West (Oregon)' => 's3-us-west-2.amazonaws.com',
'EU (Frankfurt)' => 's3.eu-central-1.amazonaws.com',
'Asia Pacific (Tokyo)' => 's3-ap-northeast-1.amazonaws.com')
def test_s3_endpoint_with_invalid_endpoint(endpoint)
assert_raise(Fluent::ConfigError, "s3_endpoint parameter is not supported, use s3_region instead. This parameter is for S3 compatible services") {
- d = create_driver(CONFIG + "s3_endpoint #{endpoint}")
+ create_driver(CONFIG + "s3_endpoint #{endpoint}")
}
end
def test_configure_with_mime_type_json
conf = CONFIG.clone
@@ -127,186 +138,150 @@
d = create_driver(conf)
assert_equal false, d.instance.check_bucket
assert_equal false, d.instance.check_object
end
- def test_config_with_hostname_placeholder
- d = create_driver(<<EOC)
- aws_key_id test_key_id
- aws_sec_key test_sec_key
- s3_bucket test_bucket
- path log/%{hostname}/test
- s3_object_key_format %{path}%{hostname}_%{index}
- buffer_type memory
-EOC
- assert_match /#{Socket.gethostname}/, d.instance.s3_object_key_format
- assert_match /#{Socket.gethostname}/, d.instance.path
- end
-
- def test_configure_with_grant
- conf = CONFIG.clone
- conf << "\grant_full_control id='0123456789'\ngrant_read id='1234567890'\ngrant_read_acp id='2345678901'\ngrant_write_acp id='3456789012'\n"
- d = create_driver(conf)
- assert_equal "id='0123456789'", d.instance.grant_full_control
- assert_equal "id='1234567890'", d.instance.grant_read
- assert_equal "id='2345678901'", d.instance.grant_read_acp
- assert_equal "id='3456789012'", d.instance.grant_write_acp
- end
-
- def test_path_slicing
- config = CONFIG.clone.gsub(/path\slog/, "path log/%Y/%m/%d")
- d = create_driver(config)
- path_slicer = d.instance.instance_variable_get(:@path_slicer)
- path = d.instance.instance_variable_get(:@path)
- slice = path_slicer.call(path)
- assert_equal slice, Time.now.utc.strftime("log/%Y/%m/%d")
- end
-
- def test_path_slicing_utc
- config = CONFIG.clone.gsub(/path\slog/, "path log/%Y/%m/%d")
- config << "\nutc\n"
- d = create_driver(config)
- path_slicer = d.instance.instance_variable_get(:@path_slicer)
- path = d.instance.instance_variable_get(:@path)
- slice = path_slicer.call(path)
- assert_equal slice, Time.now.utc.strftime("log/%Y/%m/%d")
- end
-
def test_format
d = create_driver
- time = Time.parse("2011-01-02 13:14:15 UTC").to_i
- d.emit({"a"=>1}, time)
- d.emit({"a"=>2}, time)
-
- d.expect_format %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n]
- d.expect_format %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n]
-
- d.run
+ time = event_time("2011-01-02 13:14:15 UTC")
+ d.run(default_tag: "test") do
+ d.feed(time, { "a" => 1 })
+ d.feed(time, { "a" => 2 })
+ end
+ expected = [
+ %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n],
+ %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n]
+ ]
+ assert_equal(expected, d.formatted)
end
def test_format_included_tag_and_time
config = [CONFIG, 'include_tag_key true', 'include_time_key true'].join("\n")
d = create_driver(config)
- time = Time.parse("2011-01-02 13:14:15 UTC").to_i
- d.emit({"a"=>1}, time)
- d.emit({"a"=>2}, time)
-
- d.expect_format %[2011-01-02T13:14:15Z\ttest\t{"a":1,"tag":"test","time":"2011-01-02T13:14:15Z"}\n]
- d.expect_format %[2011-01-02T13:14:15Z\ttest\t{"a":2,"tag":"test","time":"2011-01-02T13:14:15Z"}\n]
-
- d.run
+ time = event_time("2011-01-02 13:14:15 UTC")
+ d.run(default_tag: "test") do
+ d.feed(time, { "a" => 1 })
+ d.feed(time, { "a" => 2 })
+ end
+ expected = [
+ %[2011-01-02T13:14:15Z\ttest\t{"a":1,"tag":"test","time":"2011-01-02T13:14:15Z"}\n],
+ %[2011-01-02T13:14:15Z\ttest\t{"a":2,"tag":"test","time":"2011-01-02T13:14:15Z"}\n]
+ ]
+ assert_equal(expected, d.formatted)
end
def test_format_with_format_ltsv
config = [CONFIG, 'format ltsv'].join("\n")
d = create_driver(config)
- time = Time.parse("2011-01-02 13:14:15 UTC").to_i
- d.emit({"a"=>1, "b"=>1}, time)
- d.emit({"a"=>2, "b"=>2}, time)
-
- d.expect_format %[a:1\tb:1\n]
- d.expect_format %[a:2\tb:2\n]
-
- d.run
+ time = event_time("2011-01-02 13:14:15 UTC")
+ d.run(default_tag: "test") do
+ d.feed(time, {"a"=>1, "b"=>1})
+ d.feed(time, {"a"=>2, "b"=>2})
+ end
+ expected = [
+ %[a:1\tb:1\n],
+ %[a:2\tb:2\n]
+ ]
+ assert_equal(expected, d.formatted)
end
def test_format_with_format_json
config = [CONFIG, 'format json'].join("\n")
d = create_driver(config)
- time = Time.parse("2011-01-02 13:14:15 UTC").to_i
- d.emit({"a"=>1}, time)
- d.emit({"a"=>2}, time)
-
- d.expect_format %[{"a":1}\n]
- d.expect_format %[{"a":2}\n]
-
- d.run
+ time = event_time("2011-01-02 13:14:15 UTC")
+ d.run(default_tag: "test") do
+ d.feed(time, {"a"=>1})
+ d.feed(time, {"a"=>2})
+ end
+ expected = [
+ %[{"a":1}\n],
+ %[{"a":2}\n]
+ ]
+ assert_equal(expected, d.formatted)
end
def test_format_with_format_json_included_tag
config = [CONFIG, 'format json', 'include_tag_key true'].join("\n")
d = create_driver(config)
- time = Time.parse("2011-01-02 13:14:15 UTC").to_i
- d.emit({"a"=>1}, time)
- d.emit({"a"=>2}, time)
-
- d.expect_format %[{"a":1,"tag":"test"}\n]
- d.expect_format %[{"a":2,"tag":"test"}\n]
-
- d.run
+ time = event_time("2011-01-02 13:14:15 UTC")
+ d.run(default_tag: "test") do
+ d.feed(time, {"a"=>1})
+ d.feed(time, {"a"=>2})
+ end
+ expected = [
+ %[{"a":1,"tag":"test"}\n],
+ %[{"a":2,"tag":"test"}\n]
+ ]
+ assert_equal(expected, d.formatted)
end
def test_format_with_format_json_included_time
config = [CONFIG, 'format json', 'include_time_key true'].join("\n")
d = create_driver(config)
- time = Time.parse("2011-01-02 13:14:15 UTC").to_i
- d.emit({"a"=>1}, time)
- d.emit({"a"=>2}, time)
-
- d.expect_format %[{"a":1,"time":"2011-01-02T13:14:15Z"}\n]
- d.expect_format %[{"a":2,"time":"2011-01-02T13:14:15Z"}\n]
-
- d.run
+ time = event_time("2011-01-02 13:14:15 UTC")
+ d.run(default_tag: "test") do
+ d.feed(time, {"a"=>1})
+ d.feed(time, {"a"=>2})
+ end
+ expected = [
+ %[{"a":1,"time":"2011-01-02T13:14:15Z"}\n],
+ %[{"a":2,"time":"2011-01-02T13:14:15Z"}\n]
+ ]
+ assert_equal(expected, d.formatted)
end
def test_format_with_format_json_included_tag_and_time
config = [CONFIG, 'format json', 'include_tag_key true', 'include_time_key true'].join("\n")
d = create_driver(config)
- time = Time.parse("2011-01-02 13:14:15 UTC").to_i
- d.emit({"a"=>1}, time)
- d.emit({"a"=>2}, time)
-
- d.expect_format %[{"a":1,"tag":"test","time":"2011-01-02T13:14:15Z"}\n]
- d.expect_format %[{"a":2,"tag":"test","time":"2011-01-02T13:14:15Z"}\n]
-
- d.run
+ time = event_time("2011-01-02 13:14:15 UTC")
+ d.run(default_tag: "test") do
+ d.feed(time, {"a"=>1})
+ d.feed(time, {"a"=>2})
+ end
+ expected = [
+ %[{"a":1,"tag":"test","time":"2011-01-02T13:14:15Z"}\n],
+ %[{"a":2,"tag":"test","time":"2011-01-02T13:14:15Z"}\n]
+ ]
+ assert_equal(expected, d.formatted)
end
- def test_chunk_to_write
- d = create_driver
-
- time = Time.parse("2011-01-02 13:14:15 UTC").to_i
- d.emit({"a"=>1}, time)
- d.emit({"a"=>2}, time)
-
- # S3OutputTest#write returns chunk.read
- data = d.run
-
- assert_equal %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] +
- %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n],
- data.first
- end
-
CONFIG_TIME_SLICE = %[
aws_key_id test_key_id
aws_sec_key test_sec_key
s3_bucket test_bucket
s3_object_key_format %{path}/events/ts=%{time_slice}/events_%{index}-%{hostname}.%{file_extension}
time_slice_format %Y%m%d-%H
path log
utc
buffer_type memory
- log_level debug
+ @log_level debug
check_bucket true
check_object true
]
def create_time_sliced_driver(conf = CONFIG_TIME_SLICE)
- d = Fluent::Test::TimeSlicedOutputTestDriver.new(Fluent::S3Output) do
+ Fluent::Test::Driver::Output.new(Fluent::Plugin::S3Output) do
+ def format(tag, time, record)
+ super
+ end
+
+ def write(chunk)
+ super
+ end
+
private
def check_apikeys
end
end.configure(conf)
- d
end
def test_write_with_hardened_s3_policy
# Partial mock the S3Bucket, not to make an actual connection to Amazon S3
setup_mocks_hardened_policy
@@ -321,16 +296,16 @@
# to make assertions on chunks' keys
config = CONFIG_TIME_SLICE.gsub(/check_object true/, "check_object false\n")
config = config.gsub(/check_bucket true/, "check_bucket false\n")
d = create_time_sliced_driver(config)
- time = Time.parse("2011-01-02 13:14:15 UTC").to_i
- d.emit({"a"=>1}, time)
- d.emit({"a"=>2}, time)
+ time = event_time("2011-01-02 13:14:15 UTC")
+ d.run(default_tag: "test") do
+ d.feed(time, {"a"=>1})
+ d.feed(time, {"a"=>2})
+ end
- # Finally, the instance of S3Output is initialized and then invoked
- d.run
Zlib::GzipReader.open(s3_local_file_path) do |gz|
data = gz.read
assert_equal %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] +
%[2011-01-02T13:14:15Z\ttest\t{"a":2}\n],
data
@@ -342,20 +317,18 @@
# Partial mock the S3Bucket, not to make an actual connection to Amazon S3
setup_mocks(true)
s3_local_file_path = "/tmp/s3-test.txt"
setup_s3_object_mocks(s3_local_file_path: s3_local_file_path)
- # We must use TimeSlicedOutputTestDriver instead of BufferedOutputTestDriver,
- # to make assertions on chunks' keys
d = create_time_sliced_driver
- time = Time.parse("2011-01-02 13:14:15 UTC").to_i
- d.emit({"a"=>1}, time)
- d.emit({"a"=>2}, time)
+ time = event_time("2011-01-02 13:14:15 UTC")
+ d.run(default_tag: "test") do
+ d.feed(time, {"a"=>1})
+ d.feed(time, {"a"=>2})
+ end
- # Finally, the instance of S3Output is initialized and then invoked
- d.run
Zlib::GzipReader.open(s3_local_file_path) do |gz|
data = gz.read
assert_equal %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] +
%[2011-01-02T13:14:15Z\ttest\t{"a":2}\n],
data
@@ -379,21 +352,19 @@
s3_local_file_path = "/tmp/s3-test.txt"
s3path = "log/events/ts=20110102-13/events_0-#{uuid}.gz"
setup_s3_object_mocks(s3_local_file_path: s3_local_file_path, s3path: s3path)
- # We must use TimeSlicedOutputTestDriver instead of BufferedOutputTestDriver,
- # to make assertions on chunks' keys
config = CONFIG_TIME_SLICE.gsub(/%{hostname}/,"%{uuid_flush}")
d = create_time_sliced_driver(config)
- time = Time.parse("2011-01-02 13:14:15 UTC").to_i
- d.emit({"a"=>1}, time)
- d.emit({"a"=>2}, time)
+ time = event_time("2011-01-02 13:14:15 UTC")
+ d.run(default_tag: "test") do
+ d.feed(time, {"a"=>1})
+ d.feed(time, {"a"=>2})
+ end
- # Finally, the instance of S3Output is initialized and then invoked
- d.run
Zlib::GzipReader.open(s3_local_file_path) do |gz|
data = gz.read
assert_equal %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] +
%[2011-01-02T13:14:15Z\ttest\t{"a":2}\n],
data
@@ -417,57 +388,58 @@
s3path = "log/events/ts=20110102-13/events_0-#{hex_random}.gz"
s3_local_file_path = "/tmp/s3-test.txt"
setup_s3_object_mocks(s3_local_file_path: s3_local_file_path, s3path: s3path)
d = create_time_sliced_driver(config)
- stub(d.instance).unique_hex { unique_hex }
+ stub(Fluent::UniqueId).hex(anything) { unique_hex }
- time = Time.parse("2011-01-02 13:14:15 UTC").to_i
- d.emit({"a"=>1}, time)
- d.emit({"a"=>2}, time)
+ time = event_time("2011-01-02 13:14:15 UTC")
+ d.run(default_tag: "test") do
+ d.feed(time, {"a"=>1})
+ d.feed(time, {"a"=>2})
+ end
- # Finally, the instance of S3Output is initialized and then invoked
- d.run
Zlib::GzipReader.open(s3_local_file_path) do |gz|
data = gz.read
assert_equal %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] +
%[2011-01-02T13:14:15Z\ttest\t{"a":2}\n],
data
end
FileUtils.rm_f(s3_local_file_path)
end
def setup_mocks(exists_return = false)
- @s3_client = stub(Aws::S3::Client.new(:stub_responses => true))
+ @s3_client = stub(Aws::S3::Client.new(stub_responses: true))
mock(Aws::S3::Client).new(anything).at_least(0) { @s3_client }
- @s3_resource = mock(Aws::S3::Resource.new(:client => @s3_client))
- mock(Aws::S3::Resource).new(:client => @s3_client) { @s3_resource }
- @s3_bucket = mock(Aws::S3::Bucket.new(:name => "test",
- :client => @s3_client))
+ @s3_resource = mock(Aws::S3::Resource.new(client: @s3_client))
+ mock(Aws::S3::Resource).new(client: @s3_client) { @s3_resource }
+ @s3_bucket = mock(Aws::S3::Bucket.new(name: "test",
+ client: @s3_client))
@s3_bucket.exists? { exists_return }
- @s3_object = mock(Aws::S3::Object.new(:bucket_name => "test_bucket",
- :key => "test",
- :client => @s3_client))
+ @s3_object = mock(Aws::S3::Object.new(bucket_name: "test_bucket",
+ key: "test",
+ client: @s3_client))
+ @s3_object.exists?.at_least(0) { false }
@s3_bucket.object(anything).at_least(0) { @s3_object }
@s3_resource.bucket(anything) { @s3_bucket }
end
def setup_s3_object_mocks(params = {})
s3path = params[:s3path] || "log/events/ts=20110102-13/events_0-#{Socket.gethostname}.gz"
s3_local_file_path = params[:s3_local_file_path] || "/tmp/s3-test.txt"
# Assert content of event logs which are being sent to S3
- s3obj = stub(Aws::S3::Object.new(:bucket_name => "test_bucket",
- :key => "test",
- :client => @s3_client))
+ s3obj = stub(Aws::S3::Object.new(bucket_name: "test_bucket",
+ key: "test",
+ client: @s3_client))
s3obj.exists? { false }
tempfile = File.new(s3_local_file_path, "w")
- stub(Tempfile).new("s3-", nil) { tempfile }
- s3obj.put(:body => tempfile,
- :content_type => "application/x-gzip",
- :storage_class => "STANDARD")
+ stub(Tempfile).new("s3-") { tempfile }
+ s3obj.put(body: tempfile,
+ content_type: "application/x-gzip",
+ storage_class: "STANDARD")
@s3_bucket.object(s3path) { s3obj }
end
def setup_mocks_hardened_policy()
@@ -483,20 +455,19 @@
@s3_bucket.object(anything).at_least(0) { @s3_object }
@s3_resource.bucket(anything) { @s3_bucket }
end
def setup_s3_object_mocks_hardened_policy(params = {})
- s3path = params[:s3path] || "log/20110102_131415.gz"
s3_local_file_path = params[:s3_local_file_path] || "/tmp/s3-test.txt"
# Assert content of event logs which are being sent to S3
s3obj = stub(Aws::S3::Object.new(:bucket_name => "test_bucket",
:key => "test",
:client => @s3_client))
tempfile = File.new(s3_local_file_path, "w")
- stub(Tempfile).new("s3-", nil) { tempfile }
+ stub(Tempfile).new("s3-") { tempfile }
s3obj.put(:body => tempfile,
:content_type => "application/x-gzip",
:storage_class => "STANDARD")
end
@@ -504,58 +475,59 @@
setup_mocks
config = CONFIG_TIME_SLICE + 'auto_create_bucket false'
d = create_time_sliced_driver(config)
assert_raise(RuntimeError, "The specified bucket does not exist: bucket = test_bucket") {
- d.run
+ d.run {}
}
end
def test_auto_create_bucket_true_with_non_existence_bucket
setup_mocks
- @s3_resource.create_bucket(:bucket => "test_bucket")
+ @s3_resource.create_bucket(bucket: "test_bucket")
config = CONFIG_TIME_SLICE + 'auto_create_bucket true'
d = create_time_sliced_driver(config)
- assert_nothing_raised { d.run }
+ assert_nothing_raised { d.run {} }
end
def test_credentials
d = create_time_sliced_driver
- assert_nothing_raised{ d.run }
+ assert_nothing_raised { d.run {} }
client = d.instance.instance_variable_get(:@s3).client
credentials = client.config.credentials
assert_instance_of(Aws::Credentials, credentials)
end
def test_assume_role_credentials
expected_credentials = Aws::Credentials.new("test_key", "test_secret")
- mock(Aws::AssumeRoleCredentials).new(:role_arn => "test_arn",
- :role_session_name => "test_session"){
+ mock(Aws::AssumeRoleCredentials).new(role_arn: "test_arn",
+ role_session_name: "test_session",
+ client: anything){
expected_credentials
}
config = CONFIG_TIME_SLICE.split("\n").reject{|x| x =~ /.+aws_.+/}.join("\n")
config += %[
<assume_role_credentials>
role_arn test_arn
role_session_name test_session
</assume_role_credentials>
]
d = create_time_sliced_driver(config)
- assert_nothing_raised{ d.run }
+ assert_nothing_raised { d.run {} }
client = d.instance.instance_variable_get(:@s3).client
credentials = client.config.credentials
assert_equal(expected_credentials, credentials)
end
def test_assume_role_credentials_with_region
expected_credentials = Aws::Credentials.new("test_key", "test_secret")
- sts_client = Aws::STS::Client.new(:region => 'ap-northeast-1')
- mock(Aws::STS::Client).new(:region => 'ap-northeast-1'){ sts_client }
- mock(Aws::AssumeRoleCredentials).new(:role_arn => "test_arn",
- :role_session_name => "test_session",
- :client => sts_client){
+ sts_client = Aws::STS::Client.new(region: 'ap-northeast-1')
+ mock(Aws::STS::Client).new(region: 'ap-northeast-1'){ sts_client }
+ mock(Aws::AssumeRoleCredentials).new(role_arn: "test_arn",
+ role_session_name: "test_session",
+ client: sts_client){
expected_credentials
}
config = CONFIG_TIME_SLICE.split("\n").reject{|x| x =~ /.+aws_.+/}.join("\n")
config += %[
s3_region ap-northeast-1
@@ -563,11 +535,11 @@
role_arn test_arn
role_session_name test_session
</assume_role_credentials>
]
d = create_time_sliced_driver(config)
- assert_nothing_raised{ d.run }
+ assert_nothing_raised { d.run {} }
client = d.instance.instance_variable_get(:@s3).client
credentials = client.config.credentials
assert_equal(expected_credentials, credentials)
end
@@ -578,11 +550,11 @@
config += %[
<instance_profile_credentials>
</instance_profile_credentials>
]
d = create_time_sliced_driver(config)
- assert_nothing_raised{ d.run }
+ assert_nothing_raised { d.run {} }
client = d.instance.instance_variable_get(:@s3).client
credentials = client.config.credentials
assert_equal(expected_credentials, credentials)
end
@@ -595,27 +567,27 @@
config += %[
<instance_profile_credentials>
</instance_profile_credentials>
]
d = create_time_sliced_driver(config)
- assert_nothing_raised{ d.run }
+ assert_nothing_raised { d.run {} }
client = d.instance.instance_variable_get(:@s3).client
credentials = client.config.credentials
assert_equal(expected_credentials, credentials)
ENV["AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"] = nil
end
def test_instance_profile_credentials_aws_iam_retries
expected_credentials = Aws::Credentials.new("test_key", "test_secret")
- mock(Aws::InstanceProfileCredentials).new({}).returns(expected_credentials)
+ mock(Aws::InstanceProfileCredentials).new({ retries: 10 }).returns(expected_credentials)
config = CONFIG_TIME_SLICE.split("\n").reject{|x| x =~ /.+aws_.+/}.join("\n")
config += %[
aws_iam_retries 10
]
d = create_time_sliced_driver(config)
- assert_nothing_raised{ d.run }
+ assert_nothing_raised { d.run {} }
client = d.instance.instance_variable_get(:@s3).client
credentials = client.config.credentials
assert_equal(expected_credentials, credentials)
end
@@ -626,11 +598,11 @@
config += %[
<shared_credentials>
</shared_credentials>
]
d = create_time_sliced_driver(config)
- assert_nothing_raised{ d.run }
+ assert_nothing_raised { d.run {} }
client = d.instance.instance_variable_get(:@s3).client
credentials = client.config.credentials
assert_equal(expected_credentials, credentials)
end
@@ -648,21 +620,21 @@
setup_s3_object_mocks(s3_local_file_path: s3_local_file_path)
config = CONFIG_TIME_SLICE + 'warn_for_delay 1d'
d = create_time_sliced_driver(config)
- delayed_time = Time.parse("2011-01-02 13:14:15 UTC")
- now = delayed_time + 86000 + 1
- Timecop.freeze(now)
-
- d.emit({"a"=>1}, delayed_time.to_i)
- d.emit({"a"=>2}, delayed_time.to_i)
-
- d.run
-
- logs = d.instance.log.logs
+ delayed_time = event_time("2011-01-02 13:14:15 UTC")
+ now = delayed_time.to_i + 86000 + 1
+ d.instance.log.out.flush_logs = false
+ Timecop.freeze(Time.at(now)) do
+ d.run(default_tag: "test") do
+ d.feed(delayed_time, {"a"=>1})
+ d.feed(delayed_time, {"a"=>2})
+ end
+ end
+ logs = d.instance.log.out.logs
assert_true logs.any? {|log| log.include?('out_s3: delayed events were put') }
-
- Timecop.return
+ d.instance.log.out.flush_logs = true
+ d.instance.log.out.reset
FileUtils.rm_f(s3_local_file_path)
end
end