spec/inputs/s3_spec.rb in logstash-input-s3-3.5.0 vs spec/inputs/s3_spec.rb in logstash-input-s3-3.6.0

- old
+ new

@@ -22,23 +22,25 @@ "bucket" => "logstash-test", "temporary_directory" => temporary_directory, "sincedb_path" => File.join(sincedb_path, ".sincedb") } } + let(:cutoff) { LogStash::Inputs::S3::CUTOFF_SECOND } before do FileUtils.mkdir_p(sincedb_path) Aws.config[:stub_responses] = true Thread.abort_on_exception = true end context "when interrupting the plugin" do - let(:config) { super.merge({ "interval" => 5 }) } + let(:config) { super().merge({ "interval" => 5 }) } + let(:s3_obj) { double(:key => "awesome-key", :last_modified => Time.now.round, :content_length => 10, :storage_class => 'STANDARD', :object => double(:data => double(:restore => nil)) ) } before do - expect_any_instance_of(LogStash::Inputs::S3).to receive(:list_new_files).and_return(TestInfiniteS3Object.new) + expect_any_instance_of(LogStash::Inputs::S3).to receive(:list_new_files).and_return(TestInfiniteS3Object.new(s3_obj)) end it_behaves_like "an interruptible input plugin" end @@ -113,53 +115,57 @@ end describe "#list_new_files" do before { allow_any_instance_of(Aws::S3::Bucket).to receive(:objects) { objects_list } } - let!(:present_object) {double(:key => 'this-should-be-present', :last_modified => Time.now, :content_length => 10, :storage_class => 'STANDARD', :object => double(:data => double(:restore => nil)) ) } - let!(:archived_object) {double(:key => 'this-should-be-archived', :last_modified => Time.now, :content_length => 10, :storage_class => 'GLACIER', :object => double(:data => double(:restore => nil)) ) } - let!(:deep_archived_object) {double(:key => 'this-should-be-archived', :last_modified => Time.now, :content_length => 10, :storage_class => 'GLACIER', :object => double(:data => double(:restore => nil)) ) } - let!(:restored_object) {double(:key => 'this-should-be-restored-from-archive', :last_modified => Time.now, :content_length => 10, :storage_class => 'GLACIER', :object => double(:data => double(:restore => 'ongoing-request="false", expiry-date="Thu, 01 Jan 2099 00:00:00 GMT"')) ) } - let!(:deep_restored_object) {double(:key => 'this-should-be-restored-from-deep-archive', :last_modified => Time.now, :content_length => 10, :storage_class => 'DEEP_ARCHIVE', :object => double(:data => double(:restore => 'ongoing-request="false", expiry-date="Thu, 01 Jan 2099 00:00:00 GMT"')) ) } + let!(:present_object_after_cutoff) {double(:key => 'this-should-not-be-present', :last_modified => Time.now, :content_length => 10, :storage_class => 'STANDARD', :object => double(:data => double(:restore => nil)) ) } + let!(:present_object) {double(:key => 'this-should-be-present', :last_modified => Time.now - cutoff, :content_length => 10, :storage_class => 'STANDARD', :object => double(:data => double(:restore => nil)) ) } + let!(:archived_object) {double(:key => 'this-should-be-archived', :last_modified => Time.now - cutoff, :content_length => 10, :storage_class => 'GLACIER', :object => double(:data => double(:restore => nil)) ) } + let!(:deep_archived_object) {double(:key => 'this-should-be-archived', :last_modified => Time.now - cutoff, :content_length => 10, :storage_class => 'GLACIER', :object => double(:data => double(:restore => nil)) ) } + let!(:restored_object) {double(:key => 'this-should-be-restored-from-archive', :last_modified => Time.now - cutoff, :content_length => 10, :storage_class => 'GLACIER', :object => double(:data => double(:restore => 'ongoing-request="false", expiry-date="Thu, 01 Jan 2099 00:00:00 GMT"')) ) } + let!(:deep_restored_object) {double(:key => 'this-should-be-restored-from-deep-archive', :last_modified => Time.now - cutoff, :content_length => 10, :storage_class => 'DEEP_ARCHIVE', :object => double(:data => double(:restore => 'ongoing-request="false", expiry-date="Thu, 01 Jan 2099 00:00:00 GMT"')) ) } let(:objects_list) { [ double(:key => 'exclude-this-file-1', :last_modified => Time.now - 2 * day, :content_length => 100, :storage_class => 'STANDARD'), double(:key => 'exclude/logstash', :last_modified => Time.now - 2 * day, :content_length => 50, :storage_class => 'STANDARD'), archived_object, restored_object, deep_restored_object, - present_object + present_object, + present_object_after_cutoff ] } it 'should allow user to exclude files from the s3 bucket' do plugin = LogStash::Inputs::S3.new(config.merge({ "exclude_pattern" => "^exclude" })) plugin.register - files = plugin.list_new_files + files = plugin.list_new_files.map { |item| item.key } expect(files).to include(present_object.key) expect(files).to include(restored_object.key) expect(files).to include(deep_restored_object.key) expect(files).to_not include('exclude-this-file-1') # matches exclude pattern expect(files).to_not include('exclude/logstash') # matches exclude pattern expect(files).to_not include(archived_object.key) # archived expect(files).to_not include(deep_archived_object.key) # archived + expect(files).to_not include(present_object_after_cutoff.key) # after cutoff expect(files.size).to eq(3) end it 'should support not providing a exclude pattern' do plugin = LogStash::Inputs::S3.new(config) plugin.register - files = plugin.list_new_files + files = plugin.list_new_files.map { |item| item.key } expect(files).to include(present_object.key) expect(files).to include(restored_object.key) expect(files).to include(deep_restored_object.key) expect(files).to include('exclude-this-file-1') # no exclude pattern given expect(files).to include('exclude/logstash') # no exclude pattern given expect(files).to_not include(archived_object.key) # archived expect(files).to_not include(deep_archived_object.key) # archived + expect(files).to_not include(present_object_after_cutoff.key) # after cutoff expect(files.size).to eq(5) end context 'when all files are excluded from a bucket' do let(:objects_list) { @@ -202,11 +208,11 @@ plugin = LogStash::Inputs::S3.new(config.merge({ 'backup_add_prefix' => 'mybackup', 'backup_to_bucket' => config['bucket']})) plugin.register - files = plugin.list_new_files + files = plugin.list_new_files.map { |item| item.key } expect(files).to include(present_object.key) expect(files).to_not include('mybackup-log-1') # matches backup prefix expect(files.size).to eq(1) end end @@ -216,18 +222,19 @@ allow_any_instance_of(LogStash::Inputs::S3::SinceDB::File).to receive(:read).and_return(Time.now - day) plugin.register - files = plugin.list_new_files + files = plugin.list_new_files.map { |item| item.key } expect(files).to include(present_object.key) expect(files).to include(restored_object.key) expect(files).to include(deep_restored_object.key) expect(files).to_not include('exclude-this-file-1') # too old expect(files).to_not include('exclude/logstash') # too old expect(files).to_not include(archived_object.key) # archived expect(files).to_not include(deep_archived_object.key) # archived + expect(files).to_not include(present_object_after_cutoff.key) # after cutoff expect(files.size).to eq(3) end it 'should ignore file if the file match the prefix' do prefix = 'mysource/' @@ -239,26 +246,27 @@ allow_any_instance_of(Aws::S3::Bucket).to receive(:objects).with(:prefix => prefix) { objects_list } plugin = LogStash::Inputs::S3.new(config.merge({ 'prefix' => prefix })) plugin.register - expect(plugin.list_new_files).to eq([present_object.key]) + expect(plugin.list_new_files.map { |item| item.key }).to eq([present_object.key]) end it 'should sort return object sorted by last_modification date with older first' do objects = [ double(:key => 'YESTERDAY', :last_modified => Time.now - day, :content_length => 5, :storage_class => 'STANDARD'), double(:key => 'TODAY', :last_modified => Time.now, :content_length => 5, :storage_class => 'STANDARD'), + double(:key => 'TODAY_BEFORE_CUTOFF', :last_modified => Time.now - cutoff, :content_length => 5, :storage_class => 'STANDARD'), double(:key => 'TWO_DAYS_AGO', :last_modified => Time.now - 2 * day, :content_length => 5, :storage_class => 'STANDARD') ] allow_any_instance_of(Aws::S3::Bucket).to receive(:objects) { objects } plugin = LogStash::Inputs::S3.new(config) plugin.register - expect(plugin.list_new_files).to eq(['TWO_DAYS_AGO', 'YESTERDAY', 'TODAY']) + expect(plugin.list_new_files.map { |item| item.key }).to eq(['TWO_DAYS_AGO', 'YESTERDAY', 'TODAY_BEFORE_CUTOFF']) end describe "when doing backup on the s3" do it 'should copy to another s3 bucket when keeping the original file' do plugin = LogStash::Inputs::S3.new(config.merge({ "backup_to_bucket" => "mybackup"})) @@ -449,11 +457,11 @@ include_examples "generated events" end context 'compressed with gzip extension and using custom gzip_pattern option' do - let(:config) { super.merge({ "gzip_pattern" => "gee.zip$" }) } + let(:config) { super().merge({ "gzip_pattern" => "gee.zip$" }) } let(:log) { double(:key => 'log.gee.zip', :last_modified => Time.now - 2 * day, :content_length => 5, :storage_class => 'STANDARD') } let(:log_file) { File.join(File.dirname(__FILE__), '..', 'fixtures', 'compressed.log.gee.zip') } include_examples "generated events" end @@ -497,11 +505,11 @@ include_examples "generated events" end context 'when include_object_properties is set to true' do - let(:config) { super.merge({ "include_object_properties" => true }) } + let(:config) { super().merge({ "include_object_properties" => true }) } let(:log_file) { File.join(File.dirname(__FILE__), '..', 'fixtures', 'uncompressed.log') } it 'should extract object properties onto [@metadata][s3]' do events = fetch_events(config) events.each do |event| @@ -511,11 +519,11 @@ include_examples "generated events" end context 'when include_object_properties is set to false' do - let(:config) { super.merge({ "include_object_properties" => false }) } + let(:config) { super().merge({ "include_object_properties" => false }) } let(:log_file) { File.join(File.dirname(__FILE__), '..', 'fixtures', 'uncompressed.log') } it 'should NOT extract object properties onto [@metadata][s3]' do events = fetch_events(config) events.each do |event| @@ -523,8 +531,69 @@ end end include_examples "generated events" end + end + describe "data loss" do + let(:s3_plugin) { LogStash::Inputs::S3.new(config) } + let(:queue) { [] } + + before do + s3_plugin.register + end + + context 'events come after cutoff time' do + it 'should be processed in next cycle' do + s3_objects = [ + double(:key => 'TWO_DAYS_AGO', :last_modified => Time.now.round - 2 * day, :content_length => 5, :storage_class => 'STANDARD'), + double(:key => 'YESTERDAY', :last_modified => Time.now.round - day, :content_length => 5, :storage_class => 'STANDARD'), + double(:key => 'TODAY_BEFORE_CUTOFF', :last_modified => Time.now.round - cutoff, :content_length => 5, :storage_class => 'STANDARD'), + double(:key => 'TODAY', :last_modified => Time.now.round, :content_length => 5, :storage_class => 'STANDARD'), + double(:key => 'TODAY', :last_modified => Time.now.round, :content_length => 5, :storage_class => 'STANDARD') + ] + size = s3_objects.length + + allow_any_instance_of(Aws::S3::Bucket).to receive(:objects) { s3_objects } + allow_any_instance_of(Aws::S3::Bucket).to receive(:object).and_return(*s3_objects) + expect(s3_plugin).to receive(:process_log).at_least(size).and_call_original + expect(s3_plugin).to receive(:stop?).and_return(false).at_least(size) + expect(s3_plugin).to receive(:download_remote_file).and_return(true).at_least(size) + expect(s3_plugin).to receive(:process_local_log).and_return(true).at_least(size) + + # first iteration + s3_plugin.process_files(queue) + + # second iteration + sleep(cutoff + 1) + s3_plugin.process_files(queue) + end + end + + context 's3 object updated after getting summary' do + it 'should not update sincedb' do + s3_summary = [ + double(:key => 'YESTERDAY', :last_modified => Time.now.round - day, :content_length => 5, :storage_class => 'STANDARD'), + double(:key => 'TODAY', :last_modified => Time.now.round - (cutoff * 10), :content_length => 5, :storage_class => 'STANDARD') + ] + + s3_objects = [ + double(:key => 'YESTERDAY', :last_modified => Time.now.round - day, :content_length => 5, :storage_class => 'STANDARD'), + double(:key => 'TODAY_UPDATED', :last_modified => Time.now.round, :content_length => 5, :storage_class => 'STANDARD') + ] + + size = s3_objects.length + + allow_any_instance_of(Aws::S3::Bucket).to receive(:objects) { s3_summary } + allow_any_instance_of(Aws::S3::Bucket).to receive(:object).and_return(*s3_objects) + expect(s3_plugin).to receive(:process_log).at_least(size).and_call_original + expect(s3_plugin).to receive(:stop?).and_return(false).at_least(size) + expect(s3_plugin).to receive(:download_remote_file).and_return(true).at_least(size) + expect(s3_plugin).to receive(:process_local_log).and_return(true).at_least(size) + + s3_plugin.process_files(queue) + expect(s3_plugin.send(:sincedb).read).to eq(s3_summary[0].last_modified) + end + end end end