spec/outputs/s3_spec.rb in logstash-output-s3-0.1.1 vs spec/outputs/s3_spec.rb in logstash-output-s3-0.1.2
- old
+ new
@@ -1,6 +1,324 @@
# encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
-require 'logstash/outputs/s3'
+require "logstash/outputs/s3"
+require "logstash/codecs/line"
+require "logstash/pipeline"
+require "aws-sdk"
+require "fileutils"
+require_relative "../supports/helpers"
describe LogStash::Outputs::S3 do
+ before do
+ # We stub all the calls from S3, for more information see:
+ # http://ruby.awsblog.com/post/Tx2SU6TYJWQQLC3/Stubbing-AWS-Responses
+ AWS.stub!
+
+ Thread.abort_on_exception = true
+ end
+
+ let(:minimal_settings) { { "access_key_id" => "1234",
+ "secret_access_key" => "secret",
+ "bucket" => "my-bucket" } }
+
+ describe "configuration" do
+ let!(:config) { { "endpoint_region" => "sa-east-1" } }
+
+ it "should support the deprecated endpoint_region as a configuration option" do
+ s3 = LogStash::Outputs::S3.new(config)
+ expect(s3.aws_options_hash[:s3_endpoint]).to eq("s3-sa-east-1.amazonaws.com")
+ end
+
+ it "should fallback to region if endpoint_region isnt defined" do
+ s3 = LogStash::Outputs::S3.new(config.merge({ "region" => 'sa-east-1' }))
+ expect(s3.aws_options_hash).to include(:s3_endpoint => "s3-sa-east-1.amazonaws.com")
+ end
+ end
+
+ describe "#register" do
+ it "should create the tmp directory if it doesn't exist" do
+ temporary_directory = Stud::Temporary.pathname("temporary_directory")
+
+ config = {
+ "access_key_id" => "1234",
+ "secret_access_key" => "secret",
+ "bucket" => "logstash",
+ "size_file" => 10,
+ "temporary_directory" => temporary_directory
+ }
+
+ s3 = LogStash::Outputs::S3.new(config)
+ allow(s3).to receive(:test_s3_write)
+ s3.register
+
+ expect(Dir.exist?(temporary_directory)).to eq(true)
+ FileUtils.rm_r(temporary_directory)
+ end
+
+ it "should raise a ConfigurationError if the prefix contains one or more '\^`><' characters" do
+ config = {
+ "prefix" => "`no\><^"
+ }
+
+ s3 = LogStash::Outputs::S3.new(config)
+
+ expect {
+ s3.register
+ }.to raise_error(LogStash::ConfigurationError)
+ end
+ end
+
+ describe "#generate_temporary_filename" do
+ before do
+ Socket.stub(:gethostname) { "logstash.local" }
+ Time.stub(:now) { Time.new('2015-10-09-09:00') }
+ end
+
+ it "should add tags to the filename if present" do
+ config = minimal_settings.merge({ "tags" => ["elasticsearch", "logstash", "kibana"], "temporary_directory" => "/tmp/logstash"})
+ s3 = LogStash::Outputs::S3.new(config)
+ expect(s3.get_temporary_filename).to eq("ls.s3.logstash.local.2015-01-01T00.00.tag_elasticsearch.logstash.kibana.part0.txt")
+ end
+
+ it "should not add the tags to the filename" do
+ config = minimal_settings.merge({ "tags" => [], "temporary_directory" => "/tmp/logstash" })
+ s3 = LogStash::Outputs::S3.new(config)
+ expect(s3.get_temporary_filename(3)).to eq("ls.s3.logstash.local.2015-01-01T00.00.part3.txt")
+ end
+
+ it "normalized the temp directory to include the trailing slash if missing" do
+ s3 = LogStash::Outputs::S3.new(minimal_settings.merge({ "temporary_directory" => "/tmp/logstash" }))
+ expect(s3.get_temporary_filename).to eq("ls.s3.logstash.local.2015-01-01T00.00.part0.txt")
+ end
+ end
+
+ describe "#write_on_bucket" do
+ after(:all) do
+ File.unlink(fake_data.path)
+ end
+
+ let!(:fake_data) { Stud::Temporary.file }
+
+ let(:fake_bucket) do
+ s3 = double('S3Object')
+ s3.stub(:write)
+ s3
+ end
+
+ it "should prefix the file on the bucket if a prefix is specified" do
+ prefix = "my-prefix"
+
+ config = minimal_settings.merge({
+ "prefix" => prefix,
+ "bucket" => "my-bucket"
+ })
+
+ expect_any_instance_of(AWS::S3::ObjectCollection).to receive(:[]).with("#{prefix}#{File.basename(fake_data)}") { fake_bucket }
+
+ s3 = LogStash::Outputs::S3.new(config)
+ allow(s3).to receive(:test_s3_write)
+ s3.register
+ s3.write_on_bucket(fake_data)
+ end
+
+ it 'should use the same local filename if no prefix is specified' do
+ config = minimal_settings.merge({
+ "bucket" => "my-bucket"
+ })
+
+ expect_any_instance_of(AWS::S3::ObjectCollection).to receive(:[]).with(File.basename(fake_data)) { fake_bucket }
+
+ s3 = LogStash::Outputs::S3.new(minimal_settings)
+ allow(s3).to receive(:test_s3_write)
+ s3.register
+ s3.write_on_bucket(fake_data)
+ end
+ end
+
+ describe "#write_events_to_multiple_files?" do
+ it 'returns true if the size_file is != 0 ' do
+ s3 = LogStash::Outputs::S3.new(minimal_settings.merge({ "size_file" => 200 }))
+ expect(s3.write_events_to_multiple_files?).to eq(true)
+ end
+
+ it 'returns false if size_file is zero or not set' do
+ s3 = LogStash::Outputs::S3.new(minimal_settings)
+ expect(s3.write_events_to_multiple_files?).to eq(false)
+ end
+ end
+
+ describe "#write_to_tempfile" do
+ it "should append the event to a file" do
+ Stud::Temporary.file("logstash", "a+") do |tmp|
+ s3 = LogStash::Outputs::S3.new(minimal_settings)
+ allow(s3).to receive(:test_s3_write)
+ s3.register
+ s3.tempfile = tmp
+ s3.write_to_tempfile("test-write")
+ tmp.rewind
+ expect(tmp.read).to eq("test-write")
+ end
+ end
+ end
+
+ describe "#rotate_events_log" do
+ let(:s3) { LogStash::Outputs::S3.new(minimal_settings.merge({ "size_file" => 1024 })) }
+
+ it "returns true if the tempfile is over the file_size limit" do
+ Stud::Temporary.file do |tmp|
+ tmp.stub(:size) { 2024001 }
+
+ s3.tempfile = tmp
+ expect(s3.rotate_events_log?).to be(true)
+ end
+ end
+
+ it "returns false if the tempfile is under the file_size limit" do
+ Stud::Temporary.file do |tmp|
+ tmp.stub(:size) { 100 }
+
+ s3.tempfile = tmp
+ expect(s3.rotate_events_log?).to eq(false)
+ end
+ end
+ end
+
+ describe "#move_file_to_bucket" do
+ let!(:s3) { LogStash::Outputs::S3.new(minimal_settings) }
+
+ before do
+ # Assume the AWS test credentials pass.
+ allow(s3).to receive(:test_s3_write)
+ s3.register
+ end
+
+ it "should always delete the source file" do
+ tmp = Stud::Temporary.file
+
+ allow(File).to receive(:zero?).and_return(true)
+ expect(File).to receive(:delete).with(tmp)
+
+ s3.move_file_to_bucket(tmp)
+ end
+
+ it 'should not upload the file if the size of the file is zero' do
+ temp_file = Stud::Temporary.file
+ allow(temp_file).to receive(:zero?).and_return(true)
+
+ expect(s3).not_to receive(:write_on_bucket)
+ s3.move_file_to_bucket(temp_file)
+ end
+
+ it "should upload the file if the size > 0" do
+ tmp = Stud::Temporary.file
+
+ allow(File).to receive(:zero?).and_return(false)
+ expect(s3).to receive(:write_on_bucket)
+
+ s3.move_file_to_bucket(tmp)
+ end
+ end
+
+ describe "#restore_from_crashes" do
+ it "read the temp directory and upload the matching file to s3" do
+ s3 = LogStash::Outputs::S3.new(minimal_settings.merge({ "temporary_directory" => "/tmp/logstash/" }))
+
+ expect(Dir).to receive(:[]).with("/tmp/logstash/*.txt").and_return(["/tmp/logstash/01.txt"])
+ expect(s3).to receive(:move_file_to_bucket_async).with("/tmp/logstash/01.txt")
+
+
+ s3.restore_from_crashes
+ end
+ end
+
+ describe "#receive" do
+ it "should send the event through the codecs" do
+ data = {"foo" => "bar", "baz" => {"bah" => ["a","b","c"]}, "@timestamp" => "2014-05-30T02:52:17.929Z"}
+ event = LogStash::Event.new(data)
+
+ expect_any_instance_of(LogStash::Codecs::Line).to receive(:encode).with(event)
+
+ s3 = LogStash::Outputs::S3.new(minimal_settings)
+ allow(s3).to receive(:test_s3_write)
+ s3.register
+
+ s3.receive(event)
+ end
+ end
+
+ describe "when rotating the temporary file" do
+ before { allow(File).to receive(:delete) }
+
+ it "doesn't skip events if using the size_file option" do
+ Stud::Temporary.directory do |temporary_directory|
+ size_file = rand(200..20000)
+ event_count = rand(300..15000)
+
+ config = %Q[
+ input {
+ generator {
+ count => #{event_count}
+ }
+ }
+ output {
+ s3 {
+ access_key_id => "1234"
+ secret_access_key => "secret"
+ size_file => #{size_file}
+ codec => line
+ temporary_directory => '#{temporary_directory}'
+ bucket => 'testing'
+ }
+ }
+ ]
+
+ pipeline = LogStash::Pipeline.new(config)
+
+ pipeline_thread = Thread.new { pipeline.run }
+ sleep 0.1 while !pipeline.ready?
+ pipeline_thread.join
+
+ events_written_count = events_in_files(Dir[File.join(temporary_directory, 'ls.*.txt')])
+ expect(events_written_count).to eq(event_count)
+ end
+ end
+
+ it "doesn't skip events if using the time_file option", :tag => :slow do
+ Stud::Temporary.directory do |temporary_directory|
+ time_file = rand(5..10)
+ number_of_rotation = rand(4..10)
+
+ config = {
+ "time_file" => time_file,
+ "codec" => "line",
+ "temporary_directory" => temporary_directory,
+ "bucket" => "testing"
+ }
+
+ s3 = LogStash::Outputs::S3.new(minimal_settings.merge(config))
+ # Make the test run in seconds intead of minutes..
+ allow(s3).to receive(:periodic_interval).and_return(time_file)
+ s3.register
+
+ # Force to have a few files rotation
+ stop_time = Time.now + (number_of_rotation * time_file)
+ event_count = 0
+
+ event = LogStash::Event.new("message" => "Hello World")
+
+ until Time.now > stop_time do
+ s3.receive(event)
+ event_count += 1
+ end
+ s3.teardown
+
+ generated_files = Dir[File.join(temporary_directory, 'ls.*.txt')]
+
+ events_written_count = events_in_files(generated_files)
+
+ # Skew times can affect the number of rotation..
+ expect(generated_files.count).to be_within(number_of_rotation).of(number_of_rotation + 1)
+ expect(events_written_count).to eq(event_count)
+ end
+ end
+ end
end