lib/logstash/outputs/s3.rb in logstash-output-s3-0.1.1 vs lib/logstash/outputs/s3.rb in logstash-output-s3-0.1.2
- old
+ new
@@ -1,13 +1,16 @@
# encoding: utf-8
require "logstash/outputs/base"
require "logstash/namespace"
+require "logstash/plugin_mixins/aws_config"
+require "stud/temporary"
require "socket" # for Socket.gethostname
+require "thread"
+require "tmpdir"
+require "fileutils"
-# TODO integrate aws_config in the future
-#require "logstash/plugin_mixins/aws_config"
-#
+
# INFORMATION:
#
# This plugin was created for store the logstash's events into Amazon Simple Storage Service (Amazon S3).
# For use it you needs authentications and an s3 bucket.
# Be careful to have the permission to write file on S3's bucket and run logstash with super user for establish connection.
@@ -32,36 +35,21 @@
#
##[Note] :
#
## If you specify size_file and time_file then it will create file for each tag (if specified), when time_file or
## their size > size_file, it will be triggered then they will be pushed on s3's bucket and will delete from local disk.
-#
## If you don't specify size_file, but time_file then it will create only one file for each tag (if specified).
## When time_file it will be triggered then the files will be pushed on s3's bucket and delete from local disk.
#
## If you don't specify time_file, but size_file then it will create files for each tag (if specified),
## that will be triggered when their size > size_file, then they will be pushed on s3's bucket and will delete from local disk.
#
## If you don't specific size_file and time_file you have a curios mode. It will create only one file for each tag (if specified).
## Then the file will be rest on temporary directory and don't will be pushed on bucket until we will restart logstash.
#
-# INFORMATION ABOUT CLASS:
#
-# I tried to comment the class at best i could do.
-# I think there are much thing to improve, but if you want some points to develop here a list:
-#
-# TODO Integrate aws_config in the future
-# TODO Find a method to push them all files when logtstash close the session.
-# TODO Integrate @field on the path file
-# TODO Permanent connection or on demand? For now on demand, but isn't a good implementation.
-# Use a while or a thread to try the connection before break a time_out and signal an error.
-# TODO If you have bugs report or helpful advice contact me, but remember that this code is much mine as much as yours,
-# try to work on it if you want :)
-#
-#
-# USAGE:
-#
+# #### Usage:
# This is an example of logstash config:
# [source,ruby]
# output {
# s3{
# access_key_id => "crazy_key" (required)
@@ -71,287 +59,361 @@
# size_file => 2048 (optional)
# time_file => 5 (optional)
# format => "plain" (optional)
# canned_acl => "private" (optional. Options are "private", "public_read", "public_read_write", "authenticated_read". Defaults to "private" )
# }
-# }
#
-# We analize this:
-#
-# access_key_id => "crazy_key"
-# Amazon will give you the key for use their service if you buy it or try it. (not very much open source anyway)
-#
-# secret_access_key => "monkey_access_key"
-# Amazon will give you the secret_access_key for use their service if you buy it or try it . (not very much open source anyway).
-#
-# endpoint_region => "eu-west-1"
-# When you make a contract with Amazon, you should know where the services you use.
-#
-# bucket => "boss_please_open_your_bucket"
-# Be careful you have the permission to write on bucket and know the name.
-#
-# size_file => 2048
-# Means the size, in KB, of files who can store on temporary directory before you will be pushed on bucket.
-# Is useful if you have a little server with poor space on disk and you don't want blow up the server with unnecessary temporary log files.
-#
-# time_file => 5
-# Means, in minutes, the time before the files will be pushed on bucket. Is useful if you want to push the files every specific time.
-#
-# format => "plain"
-# Means the format of events you want to store in the files
-#
-# canned_acl => "private"
-# The S3 canned ACL to use when putting the file. Defaults to "private".
-#
-# LET'S ROCK AND ROLL ON THE CODE!
-#
class LogStash::Outputs::S3 < LogStash::Outputs::Base
- #TODO integrate aws_config in the future
- # include LogStash::PluginMixins::AwsConfig
+ include LogStash::PluginMixins::AwsConfig
- config_name "s3"
- milestone 1
+ TEMPFILE_EXTENSION = "txt"
+ S3_INVALID_CHARACTERS = /[\^`><]/
- # Aws access_key.
- config :access_key_id, :validate => :string
+ config_name "s3"
+ milestone 1
+ default :codec, 'line'
- # Aws secret_access_key
- config :secret_access_key, :validate => :string
+ # S3 bucket
+ config :bucket, :validate => :string
- # S3 bucket
- config :bucket, :validate => :string
+ # AWS endpoint_region
+ config :endpoint_region, :validate => ["us-east-1", "us-west-1", "us-west-2",
+ "eu-west-1", "ap-southeast-1", "ap-southeast-2",
+ "ap-northeast-1", "sa-east-1", "us-gov-west-1"], :deprecated => 'Deprecated, use region instead.'
- # Aws endpoint_region
- config :endpoint_region, :validate => ["us-east-1", "us-west-1", "us-west-2",
- "eu-west-1", "ap-southeast-1", "ap-southeast-2",
- "ap-northeast-1", "sa-east-1", "us-gov-west-1"], :default => "us-east-1"
+ # Set the size of file in bytes, this means that files on bucket when have dimension > file_size, they are stored in two or more file.
+ # If you have tags then it will generate a specific size file for every tags
+ ##NOTE: define size of file is the better thing, because generate a local temporary file on disk and then put it in bucket.
+ config :size_file, :validate => :number, :default => 0
- # Set the size of file in KB, this means that files on bucket when have dimension > file_size, they are stored in two or more file.
- # If you have tags then it will generate a specific size file for every tags
- ##NOTE: define size of file is the better thing, because generate a local temporary file on disk and then put it in bucket.
- config :size_file, :validate => :number, :default => 0
+ # Set the time, in minutes, to close the current sub_time_section of bucket.
+ # If you define file_size you have a number of files in consideration of the section and the current tag.
+ # 0 stay all time on listerner, beware if you specific 0 and size_file 0, because you will not put the file on bucket,
+ # for now the only thing this plugin can do is to put the file when logstash restart.
+ config :time_file, :validate => :number, :default => 0
- # Set the time, in minutes, to close the current sub_time_section of bucket.
- # If you define file_size you have a number of files in consideration of the section and the current tag.
- # 0 stay all time on listerner, beware if you specific 0 and size_file 0, because you will not put the file on bucket,
- # for now the only thing this plugin can do is to put the file when logstash restart.
- config :time_file, :validate => :number, :default => 0
+ ## IMPORTANT: if you use multiple instance of s3, you should specify on one of them the "restore=> true" and on the others "restore => false".
+ ## This is hack for not destroy the new files after restoring the initial files.
+ ## If you do not specify "restore => true" when logstash crashes or is restarted, the files are not sent into the bucket,
+ ## for example if you have single Instance.
+ config :restore, :validate => :boolean, :default => false
- # The event format you want to store in files. Defaults to plain text.
- config :format, :validate => [ "json", "plain", "nil" ], :default => "plain"
+ # The S3 canned ACL to use when putting the file. Defaults to "private".
+ config :canned_acl, :validate => ["private", "public_read", "public_read_write", "authenticated_read"],
+ :default => "private"
- ## IMPORTANT: if you use multiple instance of s3, you should specify on one of them the "restore=> true" and on the others "restore => false".
- ## This is hack for not destroy the new files after restoring the initial files.
- ## If you do not specify "restore => true" when logstash crashes or is restarted, the files are not sent into the bucket,
- ## for example if you have single Instance.
- config :restore, :validate => :boolean, :default => false
+ # Set the directory where logstash will store the tmp files before sending it to S3
+ # default to the current OS temporary directory in linux /tmp/logstash
+ config :temporary_directory, :validate => :string, :default => File.join(Dir.tmpdir, "logstash")
- # Aws canned ACL
- config :canned_acl, :validate => ["private", "public_read", "public_read_write", "authenticated_read"],
- :default => "private"
+ # Specify a prefix to the uploaded filename, this can simulate directories on S3
+ config :prefix, :validate => :string, :default => ''
- # Method to set up the aws configuration and establish connection
- def aws_s3_config
+ # Specify how many workers to use to upload the files to S3
+ config :upload_workers_count, :validate => :number, :default => 1
- @endpoint_region == 'us-east-1' ? @endpoint_region = 's3.amazonaws.com' : @endpoint_region = 's3-'+@endpoint_region+'.amazonaws.com'
+ # Exposed attributes for testing purpose.
+ attr_accessor :tempfile
+ attr_reader :page_counter
+ attr_reader :s3
- @logger.info("Registering s3 output", :bucket => @bucket, :endpoint_region => @endpoint_region)
+ def aws_s3_config
+ @logger.info("Registering s3 output", :bucket => @bucket, :endpoint_region => @region)
+ @s3 = AWS::S3.new(aws_options_hash)
+ end
- AWS.config(
- :access_key_id => @access_key_id,
- :secret_access_key => @secret_access_key,
- :s3_endpoint => @endpoint_region
- )
- @s3 = AWS::S3.new
+ def aws_service_endpoint(region)
+ # Make the deprecated endpoint_region work
+ # TODO: (ph) Remove this after deprecation.
+
+ if @endpoint_region
+ region_to_use = @endpoint_region
+ else
+ region_to_use = @region
+ end
- end
+ return {
+ :s3_endpoint => region_to_use == 'us-east-1' ? 's3.amazonaws.com' : "s3-#{region_to_use}.amazonaws.com"
+ }
+ end
- # This method is used to manage sleep and awaken thread.
- def time_alert(interval)
+ public
+ def write_on_bucket(file)
+ # find and use the bucket
+ bucket = @s3.buckets[@bucket]
- Thread.new do
- loop do
- start_time = Time.now
- yield
- elapsed = Time.now - start_time
- sleep([interval - elapsed, 0].max)
- end
- end
+ remote_filename = "#{@prefix}#{File.basename(file)}"
- end
+ @logger.debug("S3: ready to write file in bucket", :remote_filename => remote_filename, :bucket => @bucket)
- # this method is used for write files on bucket. It accept the file and the name of file.
- def write_on_bucket (file_data, file_basename)
+ begin
+ # prepare for write the file
+ object = bucket.objects[remote_filename]
+ object.write(:file => file, :acl => @canned_acl)
+ rescue AWS::Errors::Base => error
+ @logger.error("S3: AWS error", :error => error)
+ raise LogStash::Error, "AWS Configuration Error, #{error}"
+ end
- # if you lose connection with s3, bad control implementation.
- if ( @s3 == nil)
- aws_s3_config
+ @logger.debug("S3: has written remote file in bucket with canned ACL", :remote_filename => remote_filename, :bucket => @bucket, :canned_acl => @canned_acl)
end
- # find and use the bucket
- bucket = @s3.buckets[@bucket]
+ # This method is used for create new empty temporary files for use. Flag is needed for indicate new subsection time_file.
+ public
+ def create_temporary_file
+ filename = File.join(@temporary_directory, get_temporary_filename(@page_counter))
- @logger.debug "S3: ready to write "+file_basename+" in bucket "+@bucket+", Fire in the hole!"
+ @logger.debug("S3: Creating a new temporary file", :filename => filename)
- # prepare for write the file
- object = bucket.objects[file_basename]
- object.write(:file => file_data, :acl => @canned_acl)
+ @file_rotation_lock.synchronize do
+ unless @tempfile.nil?
+ @tempfile.close
+ end
- @logger.debug "S3: has written "+file_basename+" in bucket "+@bucket + " with canned ACL \"" + @canned_acl + "\""
+ @tempfile = File.open(filename, "a")
+ end
+ end
- end
+ public
+ def register
+ require "aws-sdk"
+ # required if using ruby version < 2.0
+ # http://ruby.awsblog.com/post/Tx16QY1CI5GVBFT/Threading-with-the-AWS-SDK-for-Ruby
+ AWS.eager_autoload!(AWS::S3)
- # this method is used for create new path for name the file
- def getFinalPath
+ workers_not_supported
- @pass_time = Time.now
- return @temp_directory+"ls.s3."+Socket.gethostname+"."+(@pass_time).strftime("%Y-%m-%dT%H.%M")
+ @s3 = aws_s3_config
+ @upload_queue = Queue.new
+ @file_rotation_lock = Mutex.new
- end
+ if @prefix && @prefix =~ S3_INVALID_CHARACTERS
+ @logger.error("S3: prefix contains invalid characters", :prefix => @prefix, :contains => S3_INVALID_CHARACTERS)
+ raise LogStash::ConfigurationError, "S3: prefix contains invalid characters"
+ end
- # This method is used for restore the previous crash of logstash or to prepare the files to send in bucket.
- # Take two parameter: flag and name. Flag indicate if you want to restore or not, name is the name of file
- def upFile(flag, name)
+ if !Dir.exist?(@temporary_directory)
+ FileUtils.mkdir_p(@temporary_directory)
+ end
- Dir[@temp_directory+name].each do |file|
- name_file = File.basename(file)
+ test_s3_write
- if (flag == true)
- @logger.warn "S3: have found temporary file: "+name_file+", something has crashed before... Prepare for upload in bucket!"
- end
+ restore_from_crashes if @restore == true
+ reset_page_counter
+ create_temporary_file
+ configure_periodic_rotation if time_file != 0
+ configure_upload_workers
- if (!File.zero?(file))
- write_on_bucket(file, name_file)
+ @codec.on_event do |event, encoded_event|
+ handle_event(encoded_event)
+ end
+ end
- if (flag == true)
- @logger.debug "S3: file: "+name_file+" restored on bucket "+@bucket
- else
- @logger.debug "S3: file: "+name_file+" was put on bucket "+@bucket
- end
- end
- File.delete (file)
+ # Use the same method that Amazon use to check
+ # permission on the user bucket by creating a small file
+ public
+ def test_s3_write
+ @logger.debug("S3: Creating a test file on S3")
- end
- end
+ test_filename = File.join(@temporary_directory,
+ "logstash-programmatic-access-test-object-#{Time.now.to_i}")
- # This method is used for create new empty temporary files for use. Flag is needed for indicate new subsection time_file.
- def newFile (flag)
+ File.open(test_filename, 'a') do |file|
+ file.write('test')
+ end
- if (flag == true)
- @current_final_path = getFinalPath
- @sizeCounter = 0
- end
+ begin
+ write_on_bucket(test_filename)
+ delete_on_bucket(test_filename)
+ ensure
+ File.delete(test_filename)
+ end
+ end
+
+ public
+ def restore_from_crashes
+ @logger.debug("S3: is attempting to verify previous crashes...")
- if (@tags.size != 0)
- @tempFile = File.new(@current_final_path+".tag_"+@tag_path+"part"+@sizeCounter.to_s+".txt", "w")
- else
- @tempFile = File.new(@current_final_path+".part"+@sizeCounter.to_s+".txt", "w")
- end
+ Dir[File.join(@temporary_directory, "*.#{TEMPFILE_EXTENSION}")].each do |file|
+ name_file = File.basename(file)
+ @logger.warn("S3: have found temporary file the upload process crashed, uploading file to S3.", :filename => name_file)
+ move_file_to_bucket_async(file)
+ end
+ end
- end
+ public
+ def move_file_to_bucket(file)
+ if !File.zero?(file)
+ write_on_bucket(file)
+ @logger.debug("S3: file was put on the upload thread", :filename => File.basename(file), :bucket => @bucket)
+ end
- public
- def register
- require "aws-sdk"
- @temp_directory = "/opt/logstash/S3_temp/"
+ begin
+ File.delete(file)
+ rescue Errno::ENOENT
+ # Something else deleted the file, logging but not raising the issue
+ @logger.warn("S3: Cannot delete the temporary file since it doesn't exist on disk", :filename => File.basename(file))
+ rescue Errno::EACCES
+ @logger.error("S3: Logstash doesnt have the permission to delete the file in the temporary directory.", :filename => File.basename, :temporary_directory => @temporary_directory)
+ end
+ end
- if (@tags.size != 0)
- @tag_path = ""
- for i in (0..@tags.size-1)
- @tag_path += @tags[i].to_s+"."
- end
- end
+ public
+ def periodic_interval
+ @time_file * 60
+ end
- if !(File.directory? @temp_directory)
- @logger.debug "S3: Directory "+@temp_directory+" doesn't exist, let's make it!"
- Dir.mkdir(@temp_directory)
- else
- @logger.debug "S3: Directory "+@temp_directory+" exist, nothing to do"
- end
+ public
+ def get_temporary_filename(page_counter = 0)
+ current_time = Time.now
+ filename = "ls.s3.#{Socket.gethostname}.#{current_time.strftime("%Y-%m-%dT%H.%M")}"
- if (@restore == true )
- @logger.debug "S3: is attempting to verify previous crashes..."
+ if @tags.size > 0
+ return "#{filename}.tag_#{@tags.join('.')}.part#{page_counter}.#{TEMPFILE_EXTENSION}"
+ else
+ return "#{filename}.part#{page_counter}.#{TEMPFILE_EXTENSION}"
+ end
+ end
- upFile(true, "*.txt")
- end
+ public
+ def receive(event)
+ return unless output?(event)
+ @codec.encode(event)
+ end
- newFile(true)
+ public
+ def rotate_events_log?
+ @tempfile.size > @size_file
+ end
- if (time_file != 0)
- first_time = true
- @thread = time_alert(@time_file*60) do
- if (first_time == false)
- @logger.debug "S3: time_file triggered, let's bucket the file if dosen't empty and create new file "
- upFile(false, File.basename(@tempFile))
- newFile(true)
- else
- first_time = false
- end
- end
- end
+ public
+ def write_events_to_multiple_files?
+ @size_file > 0
+ end
- end
+ public
+ def write_to_tempfile(event)
+ begin
+ @logger.debug("S3: put event into tempfile ", :tempfile => File.basename(@tempfile))
- public
- def receive(event)
- return unless output?(event)
+ @file_rotation_lock.synchronize do
+ @tempfile.syswrite(event)
+ end
+ rescue Errno::ENOSPC
+ @logger.error("S3: No space left in temporary directory", :temporary_directory => @temporary_directory)
+ teardown
+ end
+ end
- # Prepare format of Events
- if (@format == "plain")
- message = self.class.format_message(event)
- elsif (@format == "json")
- message = event.to_json
- else
- message = event.to_s
+ public
+ def teardown
+ shutdown_upload_workers
+ @periodic_rotation_thread.stop! if @periodic_rotation_thread
+
+ @tempfile.close
+ finished
end
- if(time_file !=0)
- @logger.debug "S3: trigger files after "+((@pass_time+60*time_file)-Time.now).to_s
+ private
+ def shutdown_upload_workers
+ @logger.debug("S3: Gracefully shutdown the upload workers")
+ @upload_queue << LogStash::ShutdownEvent
end
- # if specific the size
- if(size_file !=0)
+ private
+ def handle_event(encoded_event)
+ if write_events_to_multiple_files?
+ if rotate_events_log?
+ @logger.debug("S3: tempfile is too large, let's bucket it and create new file", :tempfile => File.basename(@tempfile))
- if (@tempFile.size < @size_file )
+ move_file_to_bucket_async(@tempfile.path)
+ next_page
+ create_temporary_file
+ else
+ @logger.debug("S3: tempfile file size report.", :tempfile_size => @tempfile.size, :size_file => @size_file)
+ end
+ end
- @logger.debug "S3: File have size: "+@tempFile.size.to_s+" and size_file is: "+ @size_file.to_s
- @logger.debug "S3: put event into: "+File.basename(@tempFile)
+ write_to_tempfile(encoded_event)
+ end
- # Put the event in the file, now!
- File.open(@tempFile, 'a') do |file|
- file.puts message
- file.write "\n"
- end
+ private
+ def configure_periodic_rotation
+ @periodic_rotation_thread = Stud::Task.new do
+ LogStash::Util::set_thread_name("<S3 periodic uploader")
- else
+ Stud.interval(periodic_interval, :sleep_then_run => true) do
+ @logger.debug("S3: time_file triggered, bucketing the file", :filename => @tempfile.path)
- @logger.debug "S3: file: "+File.basename(@tempFile)+" is too large, let's bucket it and create new file"
- upFile(false, File.basename(@tempFile))
- @sizeCounter += 1
- newFile(false)
+ move_file_to_bucket_async(@tempfile.path)
+ next_page
+ create_temporary_file
+ end
+ end
+ end
- end
+ private
+ def configure_upload_workers
+ @logger.debug("S3: Configure upload workers")
- # else we put all in one file
- else
+ @upload_workers = @upload_workers_count.times.map do |worker_id|
+ Stud::Task.new do
+ LogStash::Util::set_thread_name("<S3 upload worker #{worker_id}")
- @logger.debug "S3: put event into "+File.basename(@tempFile)
- File.open(@tempFile, 'a') do |file|
- file.puts message
- file.write "\n"
+ while true do
+ @logger.debug("S3: upload worker is waiting for a new file to upload.", :worker_id => worker_id)
+
+ upload_worker
+ end
+ end
end
end
- end
+ private
+ def upload_worker
+ file = @upload_queue.deq
- def self.format_message(event)
- message = "Date: #{event[LogStash::Event::TIMESTAMP]}\n"
- message << "Source: #{event["source"]}\n"
- message << "Tags: #{event["tags"].join(', ')}\n"
- message << "Fields: #{event.to_hash.inspect}\n"
- message << "Message: #{event["message"]}"
- end
+ case file
+ when LogStash::ShutdownEvent
+ @logger.debug("S3: upload worker is shutting down gracefuly")
+ @upload_queue.enq(LogStash::ShutdownEvent)
+ else
+ @logger.debug("S3: upload working is uploading a new file", :filename => File.basename(file))
+ move_file_to_bucket(file)
+ end
+ end
-end
+ private
+ def next_page
+ @page_counter += 1
+ end
-# Enjoy it, by Bistic:)
+ private
+ def reset_page_counter
+ @page_counter = 0
+ end
+
+ private
+ def delete_on_bucket(filename)
+ bucket = @s3.buckets[@bucket]
+
+ remote_filename = "#{@prefix}#{File.basename(filename)}"
+
+ @logger.debug("S3: delete file from bucket", :remote_filename => remote_filename, :bucket => @bucket)
+
+ begin
+ # prepare for write the file
+ object = bucket.objects[remote_filename]
+ object.delete
+ rescue AWS::Errors::Base => e
+ @logger.error("S3: AWS error", :error => e)
+ raise LogStash::ConfigurationError, "AWS Configuration Error"
+ end
+ end
+
+ private
+ def move_file_to_bucket_async(file)
+ @logger.debug("S3: Sending the file to the upload queue.", :filename => File.basename(file))
+ @upload_queue.enq(file)
+ end
+end