spec/outputs/s3_spec.rb in logstash-output-s3-3.2.0 vs spec/outputs/s3_spec.rb in logstash-output-s3-4.0.0

- old
+ new

@@ -1,371 +1,88 @@ # encoding: utf-8 -require "logstash/devutils/rspec/spec_helper" require "logstash/outputs/s3" +require "logstash/event" require "logstash/codecs/line" -require "logstash/pipeline" -require "aws-sdk" -require "fileutils" -require_relative "../supports/helpers" +require "stud/temporary" describe LogStash::Outputs::S3 do + let(:prefix) { "super/%{server}" } + let(:region) { "us-east-1" } + let(:bucket_name) { "mybucket" } + let(:options) { { "region" => region, + "bucket" => bucket_name, + "prefix" => prefix, + "restore" => false, + "access_key_id" => "access_key_id", + "secret_access_key" => "secret_access_key" + } } + let(:client) { Aws::S3::Client.new(stub_responses: true) } + let(:mock_bucket) { Aws::S3::Bucket.new(:name => bucket_name, :stub_responses => true, :client => client) } + let(:event) { LogStash::Event.new({ "server" => "overwatch" }) } + let(:event_encoded) { "super hype" } + let(:events_and_encoded) { { event => event_encoded } } + + subject { described_class.new(options) } + before do - # We stub all the calls from S3, for more information see: - # http://ruby.awsblog.com/post/Tx2SU6TYJWQQLC3/Stubbing-AWS-Responses - AWS.stub! - Thread.abort_on_exception = true + allow(subject).to receive(:bucket_resource).and_return(mock_bucket) + allow(LogStash::Outputs::S3::WriteBucketPermissionValidator).to receive(:valid?).with(mock_bucket).and_return(true) end - let(:minimal_settings) { { "access_key_id" => "1234", - "secret_access_key" => "secret", - "bucket" => "my-bucket" } } - - describe "configuration" do - let!(:config) { { "region" => "sa-east-1" } } - + context "#register configuration validation" do describe "signature version" do it "should set the signature version if specified" do - s3 = LogStash::Outputs::S3.new(config.merge({ "signature_version" => 'v4' })) - expect(s3.full_options[:s3_signature_version]).to eql('v4') + ["v2", "v4"].each do |version| + s3 = described_class.new(options.merge({ "signature_version" => version })) + expect(s3.full_options).to include(:s3_signature_version => version) + end end it "should omit the option completely if not specified" do - s3 = LogStash::Outputs::S3.new(config) + s3 = described_class.new(options) expect(s3.full_options.has_key?(:s3_signature_version)).to eql(false) end end - end - describe "#register" do - it "should create the tmp directory if it doesn't exist" do - temporary_directory = Stud::Temporary.pathname("temporary_directory") + describe "temporary directory" do + let(:temporary_directory) { Stud::Temporary.pathname } + let(:options) { super.merge({ "temporary_directory" => temporary_directory }) } - config = { - "access_key_id" => "1234", - "secret_access_key" => "secret", - "bucket" => "logstash", - "size_file" => 10, - "temporary_directory" => temporary_directory - } - - s3 = LogStash::Outputs::S3.new(config) - allow(s3).to receive(:test_s3_write) - s3.register - - expect(Dir.exist?(temporary_directory)).to eq(true) - s3.close - FileUtils.rm_r(temporary_directory) - end - - it "should raise a ConfigurationError if the prefix contains one or more '\^`><' characters" do - config = { - "prefix" => "`no\><^" - } - - s3 = LogStash::Outputs::S3.new(config) - - expect { - s3.register - }.to raise_error(LogStash::ConfigurationError) - end - end - - describe "#generate_temporary_filename" do - before do - allow(Socket).to receive(:gethostname) { "logstash.local" } - end - - it "should add tags to the filename if present" do - config = minimal_settings.merge({ "tags" => ["elasticsearch", "logstash", "kibana"], "temporary_directory" => "/tmp/logstash"}) - s3 = LogStash::Outputs::S3.new(config) - expect(s3.get_temporary_filename).to match(/^ls\.s3\.logstash\.local\.\d{4}-\d{2}\-\d{2}T\d{2}\.\d{2}\.tag_#{config["tags"].join("\.")}\.part0\.txt\Z/) - end - - it "should not add the tags to the filename" do - config = minimal_settings.merge({ "tags" => [], "temporary_directory" => "/tmp/logstash" }) - s3 = LogStash::Outputs::S3.new(config) - expect(s3.get_temporary_filename(3)).to match(/^ls\.s3\.logstash\.local\.\d{4}-\d{2}\-\d{2}T\d{2}\.\d{2}\.part3\.txt\Z/) - end - - it "normalized the temp directory to include the trailing slash if missing" do - s3 = LogStash::Outputs::S3.new(minimal_settings.merge({ "temporary_directory" => "/tmp/logstash" })) - expect(s3.get_temporary_filename).to match(/^ls\.s3\.logstash\.local\.\d{4}-\d{2}\-\d{2}T\d{2}\.\d{2}\.part0\.txt\Z/) - end - end - - describe "#write_on_bucket" do - let!(:fake_data) { Stud::Temporary.file } - - let(:fake_bucket) do - s3 = double('S3Object') - allow(s3).to receive(:write) - s3 - end - - it "should prefix the file on the bucket if a prefix is specified" do - prefix = "my-prefix" - - config = minimal_settings.merge({ - "prefix" => prefix, - "bucket" => "my-bucket" - }) - - expect_any_instance_of(AWS::S3::ObjectCollection).to receive(:[]).with("#{prefix}#{File.basename(fake_data)}") { fake_bucket } - - s3 = LogStash::Outputs::S3.new(config) - allow(s3).to receive(:test_s3_write) - s3.register - s3.write_on_bucket(fake_data) - end - - it 'should use the same local filename if no prefix is specified' do - config = minimal_settings.merge({ - "bucket" => "my-bucket" - }) - - expect_any_instance_of(AWS::S3::ObjectCollection).to receive(:[]).with(File.basename(fake_data)) { fake_bucket } - - s3 = LogStash::Outputs::S3.new(minimal_settings) - allow(s3).to receive(:test_s3_write) - s3.register - s3.write_on_bucket(fake_data) - end - end - - describe "#write_events_to_multiple_files?" do - it 'returns true if the size_file is != 0 ' do - s3 = LogStash::Outputs::S3.new(minimal_settings.merge({ "size_file" => 200 })) - expect(s3.write_events_to_multiple_files?).to eq(true) - end - - it 'returns false if size_file is zero or not set' do - s3 = LogStash::Outputs::S3.new(minimal_settings) - expect(s3.write_events_to_multiple_files?).to eq(false) - end - end - - describe "#write_to_tempfile" do - it "should append the event to a file" do - Stud::Temporary.file("logstash", "a+") do |tmp| - s3 = LogStash::Outputs::S3.new(minimal_settings) - allow(s3).to receive(:test_s3_write) - s3.register - s3.tempfile = tmp - s3.write_to_tempfile("test-write") - tmp.rewind - expect(tmp.read).to eq("test-write") + it "creates the directory when it doesn't exist" do + expect(Dir.exist?(temporary_directory)).to be_falsey + subject.register + expect(Dir.exist?(temporary_directory)).to be_truthy end - end - end - describe "#rotate_events_log" do - - context "having a single worker" do - let(:s3) { LogStash::Outputs::S3.new(minimal_settings.merge({ "size_file" => 1024 })) } - - before(:each) do - s3.register + it "raises an error if we cannot write to the directory" do + expect(LogStash::Outputs::S3::WritableDirectoryValidator).to receive(:valid?).with(temporary_directory).and_return(false) + expect { subject.register }.to raise_error(LogStash::ConfigurationError) end - - it "returns true if the tempfile is over the file_size limit" do - Stud::Temporary.file do |tmp| - allow(tmp).to receive(:size) { 2024001 } - - s3.tempfile = tmp - expect(s3.rotate_events_log?).to be(true) - end - end - - it "returns false if the tempfile is under the file_size limit" do - Stud::Temporary.file do |tmp| - allow(tmp).to receive(:size) { 100 } - - s3.tempfile = tmp - expect(s3.rotate_events_log?).to eq(false) - end - end end - context "having periodic rotations" do - let(:s3) { LogStash::Outputs::S3.new(minimal_settings.merge({ "size_file" => 1024, "time_file" => 6e-10 })) } - let(:tmp) { Tempfile.new('s3_rotation_temp_file') } - - before(:each) do - s3.tempfile = tmp - s3.register - end - - after(:each) do - s3.close - tmp.close - tmp.unlink - end - - it "raises no error when periodic rotation happen" do - 1000.times do - expect { s3.rotate_events_log? }.not_to raise_error - end - end + it "validates the prefix" do + s3 = described_class.new(options.merge({ "prefix" => "`no\><^" })) + expect { s3.register }.to raise_error(LogStash::ConfigurationError) end - end - describe "#move_file_to_bucket" do - subject { LogStash::Outputs::S3.new(minimal_settings) } - - it "should always delete the source file" do - tmp = Stud::Temporary.file - - allow(File).to receive(:zero?).and_return(true) - expect(File).to receive(:delete).with(tmp) - - subject.move_file_to_bucket(tmp) - end - - it 'should not upload the file if the size of the file is zero' do - temp_file = Stud::Temporary.file - allow(temp_file).to receive(:zero?).and_return(true) - - expect(subject).not_to receive(:write_on_bucket) - subject.move_file_to_bucket(temp_file) - end - - it "should upload the file if the size > 0" do - tmp = Stud::Temporary.file - - allow(File).to receive(:zero?).and_return(false) - expect(subject).to receive(:write_on_bucket) - - subject.move_file_to_bucket(tmp) - end - end - - describe "#restore_from_crashes" do - it "read the temp directory and upload the matching file to s3" do - s3 = LogStash::Outputs::S3.new(minimal_settings.merge({ "temporary_directory" => "/tmp/logstash/" })) - - expect(Dir).to receive(:[]).with("/tmp/logstash/*.txt").and_return(["/tmp/logstash/01.txt"]) - expect(s3).to receive(:move_file_to_bucket_async).with("/tmp/logstash/01.txt") - - - s3.restore_from_crashes - end - end - - describe "#receive" do - it "should send the event through the codecs" do - data = {"foo" => "bar", "baz" => {"bah" => ["a","b","c"]}, "@timestamp" => "2014-05-30T02:52:17.929Z"} - event = LogStash::Event.new(data) - - expect_any_instance_of(LogStash::Codecs::Line).to receive(:encode).with(event) - - s3 = LogStash::Outputs::S3.new(minimal_settings) - allow(s3).to receive(:test_s3_write) + it "allow to not validate credentials" do + s3 = described_class.new(options.merge({"validate_credentials_on_root_bucket" => false})) + expect(LogStash::Outputs::S3::WriteBucketPermissionValidator).not_to receive(:valid?).with(any_args) s3.register - - s3.receive(event) end end - describe "when rotating the temporary file" do - before { allow(File).to receive(:delete) } - - it "doesn't skip events if using the size_file option" do - Stud::Temporary.directory do |temporary_directory| - size_file = rand(200..20000) - event_count = rand(300..15000) - - config = %Q[ - input { - generator { - count => #{event_count} - } - } - output { - s3 { - access_key_id => "1234" - secret_access_key => "secret" - size_file => #{size_file} - codec => line - temporary_directory => '#{temporary_directory}' - bucket => 'testing' - } - } - ] - - pipeline = LogStash::Pipeline.new(config) - - pipeline_thread = Thread.new { pipeline.run } - sleep 0.1 while !pipeline.ready? - pipeline_thread.join - - events_written_count = events_in_files(Dir[File.join(temporary_directory, 'ls.*.txt')]) - expect(events_written_count).to eq(event_count) - end + context "receiving events" do + before do + subject.register end - describe "closing" do - let(:options) do - { - "access_key_id" => 1234, - "secret_access_key" => "secret", - "bucket" => "mahbucket" - } - end - subject do - ::LogStash::Outputs::S3.new(options) - end - - before do - subject.register - end - - it "should be clean" do - subject.do_close - end - - it "should remove all worker threads" do - subject.do_close - sleep 1 - expect(subject.upload_workers.map(&:thread).any?(&:alive?)).to be false - end + after do + subject.close end - it "doesn't skip events if using the time_file option", :tag => :slow do - Stud::Temporary.directory do |temporary_directory| - time_file = rand(1..2) - number_of_rotation = rand(2..5) - - config = { - "time_file" => time_file, - "codec" => "line", - "temporary_directory" => temporary_directory, - "bucket" => "testing" - } - - s3 = LogStash::Outputs::S3.new(minimal_settings.merge(config)) - # Make the test run in seconds intead of minutes.. - expect(s3).to receive(:periodic_interval).and_return(time_file) - s3.register - - # Force to have a few files rotation - stop_time = Time.now + (number_of_rotation * time_file) - event_count = 0 - - event = LogStash::Event.new("message" => "Hello World") - - until Time.now > stop_time do - s3.receive(event) - event_count += 1 - end - s3.close - - generated_files = Dir[File.join(temporary_directory, 'ls.*.txt')] - - events_written_count = events_in_files(generated_files) - - # Skew times can affect the number of rotation.. - expect(generated_files.count).to be_within(number_of_rotation).of(number_of_rotation + 1) - expect(events_written_count).to eq(event_count) - end + it "uses `Event#sprintf` for the prefix" do + expect(event).to receive(:sprintf).with(prefix).and_return("super/overwatch") + subject.multi_receive_encoded(events_and_encoded) end end end