spec/inputs/s3_spec.rb in logstash-input-s3-3.5.0 vs spec/inputs/s3_spec.rb in logstash-input-s3-3.6.0
- old
+ new
@@ -22,23 +22,25 @@
"bucket" => "logstash-test",
"temporary_directory" => temporary_directory,
"sincedb_path" => File.join(sincedb_path, ".sincedb")
}
}
+ let(:cutoff) { LogStash::Inputs::S3::CUTOFF_SECOND }
before do
FileUtils.mkdir_p(sincedb_path)
Aws.config[:stub_responses] = true
Thread.abort_on_exception = true
end
context "when interrupting the plugin" do
- let(:config) { super.merge({ "interval" => 5 }) }
+ let(:config) { super().merge({ "interval" => 5 }) }
+ let(:s3_obj) { double(:key => "awesome-key", :last_modified => Time.now.round, :content_length => 10, :storage_class => 'STANDARD', :object => double(:data => double(:restore => nil)) ) }
before do
- expect_any_instance_of(LogStash::Inputs::S3).to receive(:list_new_files).and_return(TestInfiniteS3Object.new)
+ expect_any_instance_of(LogStash::Inputs::S3).to receive(:list_new_files).and_return(TestInfiniteS3Object.new(s3_obj))
end
it_behaves_like "an interruptible input plugin"
end
@@ -113,53 +115,57 @@
end
describe "#list_new_files" do
before { allow_any_instance_of(Aws::S3::Bucket).to receive(:objects) { objects_list } }
- let!(:present_object) {double(:key => 'this-should-be-present', :last_modified => Time.now, :content_length => 10, :storage_class => 'STANDARD', :object => double(:data => double(:restore => nil)) ) }
- let!(:archived_object) {double(:key => 'this-should-be-archived', :last_modified => Time.now, :content_length => 10, :storage_class => 'GLACIER', :object => double(:data => double(:restore => nil)) ) }
- let!(:deep_archived_object) {double(:key => 'this-should-be-archived', :last_modified => Time.now, :content_length => 10, :storage_class => 'GLACIER', :object => double(:data => double(:restore => nil)) ) }
- let!(:restored_object) {double(:key => 'this-should-be-restored-from-archive', :last_modified => Time.now, :content_length => 10, :storage_class => 'GLACIER', :object => double(:data => double(:restore => 'ongoing-request="false", expiry-date="Thu, 01 Jan 2099 00:00:00 GMT"')) ) }
- let!(:deep_restored_object) {double(:key => 'this-should-be-restored-from-deep-archive', :last_modified => Time.now, :content_length => 10, :storage_class => 'DEEP_ARCHIVE', :object => double(:data => double(:restore => 'ongoing-request="false", expiry-date="Thu, 01 Jan 2099 00:00:00 GMT"')) ) }
+ let!(:present_object_after_cutoff) {double(:key => 'this-should-not-be-present', :last_modified => Time.now, :content_length => 10, :storage_class => 'STANDARD', :object => double(:data => double(:restore => nil)) ) }
+ let!(:present_object) {double(:key => 'this-should-be-present', :last_modified => Time.now - cutoff, :content_length => 10, :storage_class => 'STANDARD', :object => double(:data => double(:restore => nil)) ) }
+ let!(:archived_object) {double(:key => 'this-should-be-archived', :last_modified => Time.now - cutoff, :content_length => 10, :storage_class => 'GLACIER', :object => double(:data => double(:restore => nil)) ) }
+ let!(:deep_archived_object) {double(:key => 'this-should-be-archived', :last_modified => Time.now - cutoff, :content_length => 10, :storage_class => 'GLACIER', :object => double(:data => double(:restore => nil)) ) }
+ let!(:restored_object) {double(:key => 'this-should-be-restored-from-archive', :last_modified => Time.now - cutoff, :content_length => 10, :storage_class => 'GLACIER', :object => double(:data => double(:restore => 'ongoing-request="false", expiry-date="Thu, 01 Jan 2099 00:00:00 GMT"')) ) }
+ let!(:deep_restored_object) {double(:key => 'this-should-be-restored-from-deep-archive', :last_modified => Time.now - cutoff, :content_length => 10, :storage_class => 'DEEP_ARCHIVE', :object => double(:data => double(:restore => 'ongoing-request="false", expiry-date="Thu, 01 Jan 2099 00:00:00 GMT"')) ) }
let(:objects_list) {
[
double(:key => 'exclude-this-file-1', :last_modified => Time.now - 2 * day, :content_length => 100, :storage_class => 'STANDARD'),
double(:key => 'exclude/logstash', :last_modified => Time.now - 2 * day, :content_length => 50, :storage_class => 'STANDARD'),
archived_object,
restored_object,
deep_restored_object,
- present_object
+ present_object,
+ present_object_after_cutoff
]
}
it 'should allow user to exclude files from the s3 bucket' do
plugin = LogStash::Inputs::S3.new(config.merge({ "exclude_pattern" => "^exclude" }))
plugin.register
- files = plugin.list_new_files
+ files = plugin.list_new_files.map { |item| item.key }
expect(files).to include(present_object.key)
expect(files).to include(restored_object.key)
expect(files).to include(deep_restored_object.key)
expect(files).to_not include('exclude-this-file-1') # matches exclude pattern
expect(files).to_not include('exclude/logstash') # matches exclude pattern
expect(files).to_not include(archived_object.key) # archived
expect(files).to_not include(deep_archived_object.key) # archived
+ expect(files).to_not include(present_object_after_cutoff.key) # after cutoff
expect(files.size).to eq(3)
end
it 'should support not providing a exclude pattern' do
plugin = LogStash::Inputs::S3.new(config)
plugin.register
- files = plugin.list_new_files
+ files = plugin.list_new_files.map { |item| item.key }
expect(files).to include(present_object.key)
expect(files).to include(restored_object.key)
expect(files).to include(deep_restored_object.key)
expect(files).to include('exclude-this-file-1') # no exclude pattern given
expect(files).to include('exclude/logstash') # no exclude pattern given
expect(files).to_not include(archived_object.key) # archived
expect(files).to_not include(deep_archived_object.key) # archived
+ expect(files).to_not include(present_object_after_cutoff.key) # after cutoff
expect(files.size).to eq(5)
end
context 'when all files are excluded from a bucket' do
let(:objects_list) {
@@ -202,11 +208,11 @@
plugin = LogStash::Inputs::S3.new(config.merge({ 'backup_add_prefix' => 'mybackup',
'backup_to_bucket' => config['bucket']}))
plugin.register
- files = plugin.list_new_files
+ files = plugin.list_new_files.map { |item| item.key }
expect(files).to include(present_object.key)
expect(files).to_not include('mybackup-log-1') # matches backup prefix
expect(files.size).to eq(1)
end
end
@@ -216,18 +222,19 @@
allow_any_instance_of(LogStash::Inputs::S3::SinceDB::File).to receive(:read).and_return(Time.now - day)
plugin.register
- files = plugin.list_new_files
+ files = plugin.list_new_files.map { |item| item.key }
expect(files).to include(present_object.key)
expect(files).to include(restored_object.key)
expect(files).to include(deep_restored_object.key)
expect(files).to_not include('exclude-this-file-1') # too old
expect(files).to_not include('exclude/logstash') # too old
expect(files).to_not include(archived_object.key) # archived
expect(files).to_not include(deep_archived_object.key) # archived
+ expect(files).to_not include(present_object_after_cutoff.key) # after cutoff
expect(files.size).to eq(3)
end
it 'should ignore file if the file match the prefix' do
prefix = 'mysource/'
@@ -239,26 +246,27 @@
allow_any_instance_of(Aws::S3::Bucket).to receive(:objects).with(:prefix => prefix) { objects_list }
plugin = LogStash::Inputs::S3.new(config.merge({ 'prefix' => prefix }))
plugin.register
- expect(plugin.list_new_files).to eq([present_object.key])
+ expect(plugin.list_new_files.map { |item| item.key }).to eq([present_object.key])
end
it 'should sort return object sorted by last_modification date with older first' do
objects = [
double(:key => 'YESTERDAY', :last_modified => Time.now - day, :content_length => 5, :storage_class => 'STANDARD'),
double(:key => 'TODAY', :last_modified => Time.now, :content_length => 5, :storage_class => 'STANDARD'),
+ double(:key => 'TODAY_BEFORE_CUTOFF', :last_modified => Time.now - cutoff, :content_length => 5, :storage_class => 'STANDARD'),
double(:key => 'TWO_DAYS_AGO', :last_modified => Time.now - 2 * day, :content_length => 5, :storage_class => 'STANDARD')
]
allow_any_instance_of(Aws::S3::Bucket).to receive(:objects) { objects }
plugin = LogStash::Inputs::S3.new(config)
plugin.register
- expect(plugin.list_new_files).to eq(['TWO_DAYS_AGO', 'YESTERDAY', 'TODAY'])
+ expect(plugin.list_new_files.map { |item| item.key }).to eq(['TWO_DAYS_AGO', 'YESTERDAY', 'TODAY_BEFORE_CUTOFF'])
end
describe "when doing backup on the s3" do
it 'should copy to another s3 bucket when keeping the original file' do
plugin = LogStash::Inputs::S3.new(config.merge({ "backup_to_bucket" => "mybackup"}))
@@ -449,11 +457,11 @@
include_examples "generated events"
end
context 'compressed with gzip extension and using custom gzip_pattern option' do
- let(:config) { super.merge({ "gzip_pattern" => "gee.zip$" }) }
+ let(:config) { super().merge({ "gzip_pattern" => "gee.zip$" }) }
let(:log) { double(:key => 'log.gee.zip', :last_modified => Time.now - 2 * day, :content_length => 5, :storage_class => 'STANDARD') }
let(:log_file) { File.join(File.dirname(__FILE__), '..', 'fixtures', 'compressed.log.gee.zip') }
include_examples "generated events"
end
@@ -497,11 +505,11 @@
include_examples "generated events"
end
context 'when include_object_properties is set to true' do
- let(:config) { super.merge({ "include_object_properties" => true }) }
+ let(:config) { super().merge({ "include_object_properties" => true }) }
let(:log_file) { File.join(File.dirname(__FILE__), '..', 'fixtures', 'uncompressed.log') }
it 'should extract object properties onto [@metadata][s3]' do
events = fetch_events(config)
events.each do |event|
@@ -511,11 +519,11 @@
include_examples "generated events"
end
context 'when include_object_properties is set to false' do
- let(:config) { super.merge({ "include_object_properties" => false }) }
+ let(:config) { super().merge({ "include_object_properties" => false }) }
let(:log_file) { File.join(File.dirname(__FILE__), '..', 'fixtures', 'uncompressed.log') }
it 'should NOT extract object properties onto [@metadata][s3]' do
events = fetch_events(config)
events.each do |event|
@@ -523,8 +531,69 @@
end
end
include_examples "generated events"
end
+ end
+ describe "data loss" do
+ let(:s3_plugin) { LogStash::Inputs::S3.new(config) }
+ let(:queue) { [] }
+
+ before do
+ s3_plugin.register
+ end
+
+ context 'events come after cutoff time' do
+ it 'should be processed in next cycle' do
+ s3_objects = [
+ double(:key => 'TWO_DAYS_AGO', :last_modified => Time.now.round - 2 * day, :content_length => 5, :storage_class => 'STANDARD'),
+ double(:key => 'YESTERDAY', :last_modified => Time.now.round - day, :content_length => 5, :storage_class => 'STANDARD'),
+ double(:key => 'TODAY_BEFORE_CUTOFF', :last_modified => Time.now.round - cutoff, :content_length => 5, :storage_class => 'STANDARD'),
+ double(:key => 'TODAY', :last_modified => Time.now.round, :content_length => 5, :storage_class => 'STANDARD'),
+ double(:key => 'TODAY', :last_modified => Time.now.round, :content_length => 5, :storage_class => 'STANDARD')
+ ]
+ size = s3_objects.length
+
+ allow_any_instance_of(Aws::S3::Bucket).to receive(:objects) { s3_objects }
+ allow_any_instance_of(Aws::S3::Bucket).to receive(:object).and_return(*s3_objects)
+ expect(s3_plugin).to receive(:process_log).at_least(size).and_call_original
+ expect(s3_plugin).to receive(:stop?).and_return(false).at_least(size)
+ expect(s3_plugin).to receive(:download_remote_file).and_return(true).at_least(size)
+ expect(s3_plugin).to receive(:process_local_log).and_return(true).at_least(size)
+
+ # first iteration
+ s3_plugin.process_files(queue)
+
+ # second iteration
+ sleep(cutoff + 1)
+ s3_plugin.process_files(queue)
+ end
+ end
+
+ context 's3 object updated after getting summary' do
+ it 'should not update sincedb' do
+ s3_summary = [
+ double(:key => 'YESTERDAY', :last_modified => Time.now.round - day, :content_length => 5, :storage_class => 'STANDARD'),
+ double(:key => 'TODAY', :last_modified => Time.now.round - (cutoff * 10), :content_length => 5, :storage_class => 'STANDARD')
+ ]
+
+ s3_objects = [
+ double(:key => 'YESTERDAY', :last_modified => Time.now.round - day, :content_length => 5, :storage_class => 'STANDARD'),
+ double(:key => 'TODAY_UPDATED', :last_modified => Time.now.round, :content_length => 5, :storage_class => 'STANDARD')
+ ]
+
+ size = s3_objects.length
+
+ allow_any_instance_of(Aws::S3::Bucket).to receive(:objects) { s3_summary }
+ allow_any_instance_of(Aws::S3::Bucket).to receive(:object).and_return(*s3_objects)
+ expect(s3_plugin).to receive(:process_log).at_least(size).and_call_original
+ expect(s3_plugin).to receive(:stop?).and_return(false).at_least(size)
+ expect(s3_plugin).to receive(:download_remote_file).and_return(true).at_least(size)
+ expect(s3_plugin).to receive(:process_local_log).and_return(true).at_least(size)
+
+ s3_plugin.process_files(queue)
+ expect(s3_plugin.send(:sincedb).read).to eq(s3_summary[0].last_modified)
+ end
+ end
end
end