spec/outputs/s3_spec.rb in logstash-output-s3-3.2.0 vs spec/outputs/s3_spec.rb in logstash-output-s3-4.0.0
- old
+ new
@@ -1,371 +1,88 @@
# encoding: utf-8
-require "logstash/devutils/rspec/spec_helper"
require "logstash/outputs/s3"
+require "logstash/event"
require "logstash/codecs/line"
-require "logstash/pipeline"
-require "aws-sdk"
-require "fileutils"
-require_relative "../supports/helpers"
+require "stud/temporary"
describe LogStash::Outputs::S3 do
+ let(:prefix) { "super/%{server}" }
+ let(:region) { "us-east-1" }
+ let(:bucket_name) { "mybucket" }
+ let(:options) { { "region" => region,
+ "bucket" => bucket_name,
+ "prefix" => prefix,
+ "restore" => false,
+ "access_key_id" => "access_key_id",
+ "secret_access_key" => "secret_access_key"
+ } }
+ let(:client) { Aws::S3::Client.new(stub_responses: true) }
+ let(:mock_bucket) { Aws::S3::Bucket.new(:name => bucket_name, :stub_responses => true, :client => client) }
+ let(:event) { LogStash::Event.new({ "server" => "overwatch" }) }
+ let(:event_encoded) { "super hype" }
+ let(:events_and_encoded) { { event => event_encoded } }
+
+ subject { described_class.new(options) }
+
before do
- # We stub all the calls from S3, for more information see:
- # http://ruby.awsblog.com/post/Tx2SU6TYJWQQLC3/Stubbing-AWS-Responses
- AWS.stub!
- Thread.abort_on_exception = true
+ allow(subject).to receive(:bucket_resource).and_return(mock_bucket)
+ allow(LogStash::Outputs::S3::WriteBucketPermissionValidator).to receive(:valid?).with(mock_bucket).and_return(true)
end
- let(:minimal_settings) { { "access_key_id" => "1234",
- "secret_access_key" => "secret",
- "bucket" => "my-bucket" } }
-
- describe "configuration" do
- let!(:config) { { "region" => "sa-east-1" } }
-
+ context "#register configuration validation" do
describe "signature version" do
it "should set the signature version if specified" do
- s3 = LogStash::Outputs::S3.new(config.merge({ "signature_version" => 'v4' }))
- expect(s3.full_options[:s3_signature_version]).to eql('v4')
+ ["v2", "v4"].each do |version|
+ s3 = described_class.new(options.merge({ "signature_version" => version }))
+ expect(s3.full_options).to include(:s3_signature_version => version)
+ end
end
it "should omit the option completely if not specified" do
- s3 = LogStash::Outputs::S3.new(config)
+ s3 = described_class.new(options)
expect(s3.full_options.has_key?(:s3_signature_version)).to eql(false)
end
end
- end
- describe "#register" do
- it "should create the tmp directory if it doesn't exist" do
- temporary_directory = Stud::Temporary.pathname("temporary_directory")
+ describe "temporary directory" do
+ let(:temporary_directory) { Stud::Temporary.pathname }
+ let(:options) { super.merge({ "temporary_directory" => temporary_directory }) }
- config = {
- "access_key_id" => "1234",
- "secret_access_key" => "secret",
- "bucket" => "logstash",
- "size_file" => 10,
- "temporary_directory" => temporary_directory
- }
-
- s3 = LogStash::Outputs::S3.new(config)
- allow(s3).to receive(:test_s3_write)
- s3.register
-
- expect(Dir.exist?(temporary_directory)).to eq(true)
- s3.close
- FileUtils.rm_r(temporary_directory)
- end
-
- it "should raise a ConfigurationError if the prefix contains one or more '\^`><' characters" do
- config = {
- "prefix" => "`no\><^"
- }
-
- s3 = LogStash::Outputs::S3.new(config)
-
- expect {
- s3.register
- }.to raise_error(LogStash::ConfigurationError)
- end
- end
-
- describe "#generate_temporary_filename" do
- before do
- allow(Socket).to receive(:gethostname) { "logstash.local" }
- end
-
- it "should add tags to the filename if present" do
- config = minimal_settings.merge({ "tags" => ["elasticsearch", "logstash", "kibana"], "temporary_directory" => "/tmp/logstash"})
- s3 = LogStash::Outputs::S3.new(config)
- expect(s3.get_temporary_filename).to match(/^ls\.s3\.logstash\.local\.\d{4}-\d{2}\-\d{2}T\d{2}\.\d{2}\.tag_#{config["tags"].join("\.")}\.part0\.txt\Z/)
- end
-
- it "should not add the tags to the filename" do
- config = minimal_settings.merge({ "tags" => [], "temporary_directory" => "/tmp/logstash" })
- s3 = LogStash::Outputs::S3.new(config)
- expect(s3.get_temporary_filename(3)).to match(/^ls\.s3\.logstash\.local\.\d{4}-\d{2}\-\d{2}T\d{2}\.\d{2}\.part3\.txt\Z/)
- end
-
- it "normalized the temp directory to include the trailing slash if missing" do
- s3 = LogStash::Outputs::S3.new(minimal_settings.merge({ "temporary_directory" => "/tmp/logstash" }))
- expect(s3.get_temporary_filename).to match(/^ls\.s3\.logstash\.local\.\d{4}-\d{2}\-\d{2}T\d{2}\.\d{2}\.part0\.txt\Z/)
- end
- end
-
- describe "#write_on_bucket" do
- let!(:fake_data) { Stud::Temporary.file }
-
- let(:fake_bucket) do
- s3 = double('S3Object')
- allow(s3).to receive(:write)
- s3
- end
-
- it "should prefix the file on the bucket if a prefix is specified" do
- prefix = "my-prefix"
-
- config = minimal_settings.merge({
- "prefix" => prefix,
- "bucket" => "my-bucket"
- })
-
- expect_any_instance_of(AWS::S3::ObjectCollection).to receive(:[]).with("#{prefix}#{File.basename(fake_data)}") { fake_bucket }
-
- s3 = LogStash::Outputs::S3.new(config)
- allow(s3).to receive(:test_s3_write)
- s3.register
- s3.write_on_bucket(fake_data)
- end
-
- it 'should use the same local filename if no prefix is specified' do
- config = minimal_settings.merge({
- "bucket" => "my-bucket"
- })
-
- expect_any_instance_of(AWS::S3::ObjectCollection).to receive(:[]).with(File.basename(fake_data)) { fake_bucket }
-
- s3 = LogStash::Outputs::S3.new(minimal_settings)
- allow(s3).to receive(:test_s3_write)
- s3.register
- s3.write_on_bucket(fake_data)
- end
- end
-
- describe "#write_events_to_multiple_files?" do
- it 'returns true if the size_file is != 0 ' do
- s3 = LogStash::Outputs::S3.new(minimal_settings.merge({ "size_file" => 200 }))
- expect(s3.write_events_to_multiple_files?).to eq(true)
- end
-
- it 'returns false if size_file is zero or not set' do
- s3 = LogStash::Outputs::S3.new(minimal_settings)
- expect(s3.write_events_to_multiple_files?).to eq(false)
- end
- end
-
- describe "#write_to_tempfile" do
- it "should append the event to a file" do
- Stud::Temporary.file("logstash", "a+") do |tmp|
- s3 = LogStash::Outputs::S3.new(minimal_settings)
- allow(s3).to receive(:test_s3_write)
- s3.register
- s3.tempfile = tmp
- s3.write_to_tempfile("test-write")
- tmp.rewind
- expect(tmp.read).to eq("test-write")
+ it "creates the directory when it doesn't exist" do
+ expect(Dir.exist?(temporary_directory)).to be_falsey
+ subject.register
+ expect(Dir.exist?(temporary_directory)).to be_truthy
end
- end
- end
- describe "#rotate_events_log" do
-
- context "having a single worker" do
- let(:s3) { LogStash::Outputs::S3.new(minimal_settings.merge({ "size_file" => 1024 })) }
-
- before(:each) do
- s3.register
+ it "raises an error if we cannot write to the directory" do
+ expect(LogStash::Outputs::S3::WritableDirectoryValidator).to receive(:valid?).with(temporary_directory).and_return(false)
+ expect { subject.register }.to raise_error(LogStash::ConfigurationError)
end
-
- it "returns true if the tempfile is over the file_size limit" do
- Stud::Temporary.file do |tmp|
- allow(tmp).to receive(:size) { 2024001 }
-
- s3.tempfile = tmp
- expect(s3.rotate_events_log?).to be(true)
- end
- end
-
- it "returns false if the tempfile is under the file_size limit" do
- Stud::Temporary.file do |tmp|
- allow(tmp).to receive(:size) { 100 }
-
- s3.tempfile = tmp
- expect(s3.rotate_events_log?).to eq(false)
- end
- end
end
- context "having periodic rotations" do
- let(:s3) { LogStash::Outputs::S3.new(minimal_settings.merge({ "size_file" => 1024, "time_file" => 6e-10 })) }
- let(:tmp) { Tempfile.new('s3_rotation_temp_file') }
-
- before(:each) do
- s3.tempfile = tmp
- s3.register
- end
-
- after(:each) do
- s3.close
- tmp.close
- tmp.unlink
- end
-
- it "raises no error when periodic rotation happen" do
- 1000.times do
- expect { s3.rotate_events_log? }.not_to raise_error
- end
- end
+ it "validates the prefix" do
+ s3 = described_class.new(options.merge({ "prefix" => "`no\><^" }))
+ expect { s3.register }.to raise_error(LogStash::ConfigurationError)
end
- end
- describe "#move_file_to_bucket" do
- subject { LogStash::Outputs::S3.new(minimal_settings) }
-
- it "should always delete the source file" do
- tmp = Stud::Temporary.file
-
- allow(File).to receive(:zero?).and_return(true)
- expect(File).to receive(:delete).with(tmp)
-
- subject.move_file_to_bucket(tmp)
- end
-
- it 'should not upload the file if the size of the file is zero' do
- temp_file = Stud::Temporary.file
- allow(temp_file).to receive(:zero?).and_return(true)
-
- expect(subject).not_to receive(:write_on_bucket)
- subject.move_file_to_bucket(temp_file)
- end
-
- it "should upload the file if the size > 0" do
- tmp = Stud::Temporary.file
-
- allow(File).to receive(:zero?).and_return(false)
- expect(subject).to receive(:write_on_bucket)
-
- subject.move_file_to_bucket(tmp)
- end
- end
-
- describe "#restore_from_crashes" do
- it "read the temp directory and upload the matching file to s3" do
- s3 = LogStash::Outputs::S3.new(minimal_settings.merge({ "temporary_directory" => "/tmp/logstash/" }))
-
- expect(Dir).to receive(:[]).with("/tmp/logstash/*.txt").and_return(["/tmp/logstash/01.txt"])
- expect(s3).to receive(:move_file_to_bucket_async).with("/tmp/logstash/01.txt")
-
-
- s3.restore_from_crashes
- end
- end
-
- describe "#receive" do
- it "should send the event through the codecs" do
- data = {"foo" => "bar", "baz" => {"bah" => ["a","b","c"]}, "@timestamp" => "2014-05-30T02:52:17.929Z"}
- event = LogStash::Event.new(data)
-
- expect_any_instance_of(LogStash::Codecs::Line).to receive(:encode).with(event)
-
- s3 = LogStash::Outputs::S3.new(minimal_settings)
- allow(s3).to receive(:test_s3_write)
+ it "allow to not validate credentials" do
+ s3 = described_class.new(options.merge({"validate_credentials_on_root_bucket" => false}))
+ expect(LogStash::Outputs::S3::WriteBucketPermissionValidator).not_to receive(:valid?).with(any_args)
s3.register
-
- s3.receive(event)
end
end
- describe "when rotating the temporary file" do
- before { allow(File).to receive(:delete) }
-
- it "doesn't skip events if using the size_file option" do
- Stud::Temporary.directory do |temporary_directory|
- size_file = rand(200..20000)
- event_count = rand(300..15000)
-
- config = %Q[
- input {
- generator {
- count => #{event_count}
- }
- }
- output {
- s3 {
- access_key_id => "1234"
- secret_access_key => "secret"
- size_file => #{size_file}
- codec => line
- temporary_directory => '#{temporary_directory}'
- bucket => 'testing'
- }
- }
- ]
-
- pipeline = LogStash::Pipeline.new(config)
-
- pipeline_thread = Thread.new { pipeline.run }
- sleep 0.1 while !pipeline.ready?
- pipeline_thread.join
-
- events_written_count = events_in_files(Dir[File.join(temporary_directory, 'ls.*.txt')])
- expect(events_written_count).to eq(event_count)
- end
+ context "receiving events" do
+ before do
+ subject.register
end
- describe "closing" do
- let(:options) do
- {
- "access_key_id" => 1234,
- "secret_access_key" => "secret",
- "bucket" => "mahbucket"
- }
- end
- subject do
- ::LogStash::Outputs::S3.new(options)
- end
-
- before do
- subject.register
- end
-
- it "should be clean" do
- subject.do_close
- end
-
- it "should remove all worker threads" do
- subject.do_close
- sleep 1
- expect(subject.upload_workers.map(&:thread).any?(&:alive?)).to be false
- end
+ after do
+ subject.close
end
- it "doesn't skip events if using the time_file option", :tag => :slow do
- Stud::Temporary.directory do |temporary_directory|
- time_file = rand(1..2)
- number_of_rotation = rand(2..5)
-
- config = {
- "time_file" => time_file,
- "codec" => "line",
- "temporary_directory" => temporary_directory,
- "bucket" => "testing"
- }
-
- s3 = LogStash::Outputs::S3.new(minimal_settings.merge(config))
- # Make the test run in seconds intead of minutes..
- expect(s3).to receive(:periodic_interval).and_return(time_file)
- s3.register
-
- # Force to have a few files rotation
- stop_time = Time.now + (number_of_rotation * time_file)
- event_count = 0
-
- event = LogStash::Event.new("message" => "Hello World")
-
- until Time.now > stop_time do
- s3.receive(event)
- event_count += 1
- end
- s3.close
-
- generated_files = Dir[File.join(temporary_directory, 'ls.*.txt')]
-
- events_written_count = events_in_files(generated_files)
-
- # Skew times can affect the number of rotation..
- expect(generated_files.count).to be_within(number_of_rotation).of(number_of_rotation + 1)
- expect(events_written_count).to eq(event_count)
- end
+ it "uses `Event#sprintf` for the prefix" do
+ expect(event).to receive(:sprintf).with(prefix).and_return("super/overwatch")
+ subject.multi_receive_encoded(events_and_encoded)
end
end
end