lib/logstash/outputs/s3.rb in logstash-output-s3-0.1.1 vs lib/logstash/outputs/s3.rb in logstash-output-s3-0.1.2

- old
+ new

@@ -1,13 +1,16 @@ # encoding: utf-8 require "logstash/outputs/base" require "logstash/namespace" +require "logstash/plugin_mixins/aws_config" +require "stud/temporary" require "socket" # for Socket.gethostname +require "thread" +require "tmpdir" +require "fileutils" -# TODO integrate aws_config in the future -#require "logstash/plugin_mixins/aws_config" -# + # INFORMATION: # # This plugin was created for store the logstash's events into Amazon Simple Storage Service (Amazon S3). # For use it you needs authentications and an s3 bucket. # Be careful to have the permission to write file on S3's bucket and run logstash with super user for establish connection. @@ -32,36 +35,21 @@ # ##[Note] : # ## If you specify size_file and time_file then it will create file for each tag (if specified), when time_file or ## their size > size_file, it will be triggered then they will be pushed on s3's bucket and will delete from local disk. -# ## If you don't specify size_file, but time_file then it will create only one file for each tag (if specified). ## When time_file it will be triggered then the files will be pushed on s3's bucket and delete from local disk. # ## If you don't specify time_file, but size_file then it will create files for each tag (if specified), ## that will be triggered when their size > size_file, then they will be pushed on s3's bucket and will delete from local disk. # ## If you don't specific size_file and time_file you have a curios mode. It will create only one file for each tag (if specified). ## Then the file will be rest on temporary directory and don't will be pushed on bucket until we will restart logstash. # -# INFORMATION ABOUT CLASS: # -# I tried to comment the class at best i could do. -# I think there are much thing to improve, but if you want some points to develop here a list: -# -# TODO Integrate aws_config in the future -# TODO Find a method to push them all files when logtstash close the session. -# TODO Integrate @field on the path file -# TODO Permanent connection or on demand? For now on demand, but isn't a good implementation. -# Use a while or a thread to try the connection before break a time_out and signal an error. -# TODO If you have bugs report or helpful advice contact me, but remember that this code is much mine as much as yours, -# try to work on it if you want :) -# -# -# USAGE: -# +# #### Usage: # This is an example of logstash config: # [source,ruby] # output { # s3{ # access_key_id => "crazy_key" (required) @@ -71,287 +59,361 @@ # size_file => 2048 (optional) # time_file => 5 (optional) # format => "plain" (optional) # canned_acl => "private" (optional. Options are "private", "public_read", "public_read_write", "authenticated_read". Defaults to "private" ) # } -# } # -# We analize this: -# -# access_key_id => "crazy_key" -# Amazon will give you the key for use their service if you buy it or try it. (not very much open source anyway) -# -# secret_access_key => "monkey_access_key" -# Amazon will give you the secret_access_key for use their service if you buy it or try it . (not very much open source anyway). -# -# endpoint_region => "eu-west-1" -# When you make a contract with Amazon, you should know where the services you use. -# -# bucket => "boss_please_open_your_bucket" -# Be careful you have the permission to write on bucket and know the name. -# -# size_file => 2048 -# Means the size, in KB, of files who can store on temporary directory before you will be pushed on bucket. -# Is useful if you have a little server with poor space on disk and you don't want blow up the server with unnecessary temporary log files. -# -# time_file => 5 -# Means, in minutes, the time before the files will be pushed on bucket. Is useful if you want to push the files every specific time. -# -# format => "plain" -# Means the format of events you want to store in the files -# -# canned_acl => "private" -# The S3 canned ACL to use when putting the file. Defaults to "private". -# -# LET'S ROCK AND ROLL ON THE CODE! -# class LogStash::Outputs::S3 < LogStash::Outputs::Base - #TODO integrate aws_config in the future - # include LogStash::PluginMixins::AwsConfig + include LogStash::PluginMixins::AwsConfig - config_name "s3" - milestone 1 + TEMPFILE_EXTENSION = "txt" + S3_INVALID_CHARACTERS = /[\^`><]/ - # Aws access_key. - config :access_key_id, :validate => :string + config_name "s3" + milestone 1 + default :codec, 'line' - # Aws secret_access_key - config :secret_access_key, :validate => :string + # S3 bucket + config :bucket, :validate => :string - # S3 bucket - config :bucket, :validate => :string + # AWS endpoint_region + config :endpoint_region, :validate => ["us-east-1", "us-west-1", "us-west-2", + "eu-west-1", "ap-southeast-1", "ap-southeast-2", + "ap-northeast-1", "sa-east-1", "us-gov-west-1"], :deprecated => 'Deprecated, use region instead.' - # Aws endpoint_region - config :endpoint_region, :validate => ["us-east-1", "us-west-1", "us-west-2", - "eu-west-1", "ap-southeast-1", "ap-southeast-2", - "ap-northeast-1", "sa-east-1", "us-gov-west-1"], :default => "us-east-1" + # Set the size of file in bytes, this means that files on bucket when have dimension > file_size, they are stored in two or more file. + # If you have tags then it will generate a specific size file for every tags + ##NOTE: define size of file is the better thing, because generate a local temporary file on disk and then put it in bucket. + config :size_file, :validate => :number, :default => 0 - # Set the size of file in KB, this means that files on bucket when have dimension > file_size, they are stored in two or more file. - # If you have tags then it will generate a specific size file for every tags - ##NOTE: define size of file is the better thing, because generate a local temporary file on disk and then put it in bucket. - config :size_file, :validate => :number, :default => 0 + # Set the time, in minutes, to close the current sub_time_section of bucket. + # If you define file_size you have a number of files in consideration of the section and the current tag. + # 0 stay all time on listerner, beware if you specific 0 and size_file 0, because you will not put the file on bucket, + # for now the only thing this plugin can do is to put the file when logstash restart. + config :time_file, :validate => :number, :default => 0 - # Set the time, in minutes, to close the current sub_time_section of bucket. - # If you define file_size you have a number of files in consideration of the section and the current tag. - # 0 stay all time on listerner, beware if you specific 0 and size_file 0, because you will not put the file on bucket, - # for now the only thing this plugin can do is to put the file when logstash restart. - config :time_file, :validate => :number, :default => 0 + ## IMPORTANT: if you use multiple instance of s3, you should specify on one of them the "restore=> true" and on the others "restore => false". + ## This is hack for not destroy the new files after restoring the initial files. + ## If you do not specify "restore => true" when logstash crashes or is restarted, the files are not sent into the bucket, + ## for example if you have single Instance. + config :restore, :validate => :boolean, :default => false - # The event format you want to store in files. Defaults to plain text. - config :format, :validate => [ "json", "plain", "nil" ], :default => "plain" + # The S3 canned ACL to use when putting the file. Defaults to "private". + config :canned_acl, :validate => ["private", "public_read", "public_read_write", "authenticated_read"], + :default => "private" - ## IMPORTANT: if you use multiple instance of s3, you should specify on one of them the "restore=> true" and on the others "restore => false". - ## This is hack for not destroy the new files after restoring the initial files. - ## If you do not specify "restore => true" when logstash crashes or is restarted, the files are not sent into the bucket, - ## for example if you have single Instance. - config :restore, :validate => :boolean, :default => false + # Set the directory where logstash will store the tmp files before sending it to S3 + # default to the current OS temporary directory in linux /tmp/logstash + config :temporary_directory, :validate => :string, :default => File.join(Dir.tmpdir, "logstash") - # Aws canned ACL - config :canned_acl, :validate => ["private", "public_read", "public_read_write", "authenticated_read"], - :default => "private" + # Specify a prefix to the uploaded filename, this can simulate directories on S3 + config :prefix, :validate => :string, :default => '' - # Method to set up the aws configuration and establish connection - def aws_s3_config + # Specify how many workers to use to upload the files to S3 + config :upload_workers_count, :validate => :number, :default => 1 - @endpoint_region == 'us-east-1' ? @endpoint_region = 's3.amazonaws.com' : @endpoint_region = 's3-'+@endpoint_region+'.amazonaws.com' + # Exposed attributes for testing purpose. + attr_accessor :tempfile + attr_reader :page_counter + attr_reader :s3 - @logger.info("Registering s3 output", :bucket => @bucket, :endpoint_region => @endpoint_region) + def aws_s3_config + @logger.info("Registering s3 output", :bucket => @bucket, :endpoint_region => @region) + @s3 = AWS::S3.new(aws_options_hash) + end - AWS.config( - :access_key_id => @access_key_id, - :secret_access_key => @secret_access_key, - :s3_endpoint => @endpoint_region - ) - @s3 = AWS::S3.new + def aws_service_endpoint(region) + # Make the deprecated endpoint_region work + # TODO: (ph) Remove this after deprecation. + + if @endpoint_region + region_to_use = @endpoint_region + else + region_to_use = @region + end - end + return { + :s3_endpoint => region_to_use == 'us-east-1' ? 's3.amazonaws.com' : "s3-#{region_to_use}.amazonaws.com" + } + end - # This method is used to manage sleep and awaken thread. - def time_alert(interval) + public + def write_on_bucket(file) + # find and use the bucket + bucket = @s3.buckets[@bucket] - Thread.new do - loop do - start_time = Time.now - yield - elapsed = Time.now - start_time - sleep([interval - elapsed, 0].max) - end - end + remote_filename = "#{@prefix}#{File.basename(file)}" - end + @logger.debug("S3: ready to write file in bucket", :remote_filename => remote_filename, :bucket => @bucket) - # this method is used for write files on bucket. It accept the file and the name of file. - def write_on_bucket (file_data, file_basename) + begin + # prepare for write the file + object = bucket.objects[remote_filename] + object.write(:file => file, :acl => @canned_acl) + rescue AWS::Errors::Base => error + @logger.error("S3: AWS error", :error => error) + raise LogStash::Error, "AWS Configuration Error, #{error}" + end - # if you lose connection with s3, bad control implementation. - if ( @s3 == nil) - aws_s3_config + @logger.debug("S3: has written remote file in bucket with canned ACL", :remote_filename => remote_filename, :bucket => @bucket, :canned_acl => @canned_acl) end - # find and use the bucket - bucket = @s3.buckets[@bucket] + # This method is used for create new empty temporary files for use. Flag is needed for indicate new subsection time_file. + public + def create_temporary_file + filename = File.join(@temporary_directory, get_temporary_filename(@page_counter)) - @logger.debug "S3: ready to write "+file_basename+" in bucket "+@bucket+", Fire in the hole!" + @logger.debug("S3: Creating a new temporary file", :filename => filename) - # prepare for write the file - object = bucket.objects[file_basename] - object.write(:file => file_data, :acl => @canned_acl) + @file_rotation_lock.synchronize do + unless @tempfile.nil? + @tempfile.close + end - @logger.debug "S3: has written "+file_basename+" in bucket "+@bucket + " with canned ACL \"" + @canned_acl + "\"" + @tempfile = File.open(filename, "a") + end + end - end + public + def register + require "aws-sdk" + # required if using ruby version < 2.0 + # http://ruby.awsblog.com/post/Tx16QY1CI5GVBFT/Threading-with-the-AWS-SDK-for-Ruby + AWS.eager_autoload!(AWS::S3) - # this method is used for create new path for name the file - def getFinalPath + workers_not_supported - @pass_time = Time.now - return @temp_directory+"ls.s3."+Socket.gethostname+"."+(@pass_time).strftime("%Y-%m-%dT%H.%M") + @s3 = aws_s3_config + @upload_queue = Queue.new + @file_rotation_lock = Mutex.new - end + if @prefix && @prefix =~ S3_INVALID_CHARACTERS + @logger.error("S3: prefix contains invalid characters", :prefix => @prefix, :contains => S3_INVALID_CHARACTERS) + raise LogStash::ConfigurationError, "S3: prefix contains invalid characters" + end - # This method is used for restore the previous crash of logstash or to prepare the files to send in bucket. - # Take two parameter: flag and name. Flag indicate if you want to restore or not, name is the name of file - def upFile(flag, name) + if !Dir.exist?(@temporary_directory) + FileUtils.mkdir_p(@temporary_directory) + end - Dir[@temp_directory+name].each do |file| - name_file = File.basename(file) + test_s3_write - if (flag == true) - @logger.warn "S3: have found temporary file: "+name_file+", something has crashed before... Prepare for upload in bucket!" - end + restore_from_crashes if @restore == true + reset_page_counter + create_temporary_file + configure_periodic_rotation if time_file != 0 + configure_upload_workers - if (!File.zero?(file)) - write_on_bucket(file, name_file) + @codec.on_event do |event, encoded_event| + handle_event(encoded_event) + end + end - if (flag == true) - @logger.debug "S3: file: "+name_file+" restored on bucket "+@bucket - else - @logger.debug "S3: file: "+name_file+" was put on bucket "+@bucket - end - end - File.delete (file) + # Use the same method that Amazon use to check + # permission on the user bucket by creating a small file + public + def test_s3_write + @logger.debug("S3: Creating a test file on S3") - end - end + test_filename = File.join(@temporary_directory, + "logstash-programmatic-access-test-object-#{Time.now.to_i}") - # This method is used for create new empty temporary files for use. Flag is needed for indicate new subsection time_file. - def newFile (flag) + File.open(test_filename, 'a') do |file| + file.write('test') + end - if (flag == true) - @current_final_path = getFinalPath - @sizeCounter = 0 - end + begin + write_on_bucket(test_filename) + delete_on_bucket(test_filename) + ensure + File.delete(test_filename) + end + end + + public + def restore_from_crashes + @logger.debug("S3: is attempting to verify previous crashes...") - if (@tags.size != 0) - @tempFile = File.new(@current_final_path+".tag_"+@tag_path+"part"+@sizeCounter.to_s+".txt", "w") - else - @tempFile = File.new(@current_final_path+".part"+@sizeCounter.to_s+".txt", "w") - end + Dir[File.join(@temporary_directory, "*.#{TEMPFILE_EXTENSION}")].each do |file| + name_file = File.basename(file) + @logger.warn("S3: have found temporary file the upload process crashed, uploading file to S3.", :filename => name_file) + move_file_to_bucket_async(file) + end + end - end + public + def move_file_to_bucket(file) + if !File.zero?(file) + write_on_bucket(file) + @logger.debug("S3: file was put on the upload thread", :filename => File.basename(file), :bucket => @bucket) + end - public - def register - require "aws-sdk" - @temp_directory = "/opt/logstash/S3_temp/" + begin + File.delete(file) + rescue Errno::ENOENT + # Something else deleted the file, logging but not raising the issue + @logger.warn("S3: Cannot delete the temporary file since it doesn't exist on disk", :filename => File.basename(file)) + rescue Errno::EACCES + @logger.error("S3: Logstash doesnt have the permission to delete the file in the temporary directory.", :filename => File.basename, :temporary_directory => @temporary_directory) + end + end - if (@tags.size != 0) - @tag_path = "" - for i in (0..@tags.size-1) - @tag_path += @tags[i].to_s+"." - end - end + public + def periodic_interval + @time_file * 60 + end - if !(File.directory? @temp_directory) - @logger.debug "S3: Directory "+@temp_directory+" doesn't exist, let's make it!" - Dir.mkdir(@temp_directory) - else - @logger.debug "S3: Directory "+@temp_directory+" exist, nothing to do" - end + public + def get_temporary_filename(page_counter = 0) + current_time = Time.now + filename = "ls.s3.#{Socket.gethostname}.#{current_time.strftime("%Y-%m-%dT%H.%M")}" - if (@restore == true ) - @logger.debug "S3: is attempting to verify previous crashes..." + if @tags.size > 0 + return "#{filename}.tag_#{@tags.join('.')}.part#{page_counter}.#{TEMPFILE_EXTENSION}" + else + return "#{filename}.part#{page_counter}.#{TEMPFILE_EXTENSION}" + end + end - upFile(true, "*.txt") - end + public + def receive(event) + return unless output?(event) + @codec.encode(event) + end - newFile(true) + public + def rotate_events_log? + @tempfile.size > @size_file + end - if (time_file != 0) - first_time = true - @thread = time_alert(@time_file*60) do - if (first_time == false) - @logger.debug "S3: time_file triggered, let's bucket the file if dosen't empty and create new file " - upFile(false, File.basename(@tempFile)) - newFile(true) - else - first_time = false - end - end - end + public + def write_events_to_multiple_files? + @size_file > 0 + end - end + public + def write_to_tempfile(event) + begin + @logger.debug("S3: put event into tempfile ", :tempfile => File.basename(@tempfile)) - public - def receive(event) - return unless output?(event) + @file_rotation_lock.synchronize do + @tempfile.syswrite(event) + end + rescue Errno::ENOSPC + @logger.error("S3: No space left in temporary directory", :temporary_directory => @temporary_directory) + teardown + end + end - # Prepare format of Events - if (@format == "plain") - message = self.class.format_message(event) - elsif (@format == "json") - message = event.to_json - else - message = event.to_s + public + def teardown + shutdown_upload_workers + @periodic_rotation_thread.stop! if @periodic_rotation_thread + + @tempfile.close + finished end - if(time_file !=0) - @logger.debug "S3: trigger files after "+((@pass_time+60*time_file)-Time.now).to_s + private + def shutdown_upload_workers + @logger.debug("S3: Gracefully shutdown the upload workers") + @upload_queue << LogStash::ShutdownEvent end - # if specific the size - if(size_file !=0) + private + def handle_event(encoded_event) + if write_events_to_multiple_files? + if rotate_events_log? + @logger.debug("S3: tempfile is too large, let's bucket it and create new file", :tempfile => File.basename(@tempfile)) - if (@tempFile.size < @size_file ) + move_file_to_bucket_async(@tempfile.path) + next_page + create_temporary_file + else + @logger.debug("S3: tempfile file size report.", :tempfile_size => @tempfile.size, :size_file => @size_file) + end + end - @logger.debug "S3: File have size: "+@tempFile.size.to_s+" and size_file is: "+ @size_file.to_s - @logger.debug "S3: put event into: "+File.basename(@tempFile) + write_to_tempfile(encoded_event) + end - # Put the event in the file, now! - File.open(@tempFile, 'a') do |file| - file.puts message - file.write "\n" - end + private + def configure_periodic_rotation + @periodic_rotation_thread = Stud::Task.new do + LogStash::Util::set_thread_name("<S3 periodic uploader") - else + Stud.interval(periodic_interval, :sleep_then_run => true) do + @logger.debug("S3: time_file triggered, bucketing the file", :filename => @tempfile.path) - @logger.debug "S3: file: "+File.basename(@tempFile)+" is too large, let's bucket it and create new file" - upFile(false, File.basename(@tempFile)) - @sizeCounter += 1 - newFile(false) + move_file_to_bucket_async(@tempfile.path) + next_page + create_temporary_file + end + end + end - end + private + def configure_upload_workers + @logger.debug("S3: Configure upload workers") - # else we put all in one file - else + @upload_workers = @upload_workers_count.times.map do |worker_id| + Stud::Task.new do + LogStash::Util::set_thread_name("<S3 upload worker #{worker_id}") - @logger.debug "S3: put event into "+File.basename(@tempFile) - File.open(@tempFile, 'a') do |file| - file.puts message - file.write "\n" + while true do + @logger.debug("S3: upload worker is waiting for a new file to upload.", :worker_id => worker_id) + + upload_worker + end + end end end - end + private + def upload_worker + file = @upload_queue.deq - def self.format_message(event) - message = "Date: #{event[LogStash::Event::TIMESTAMP]}\n" - message << "Source: #{event["source"]}\n" - message << "Tags: #{event["tags"].join(', ')}\n" - message << "Fields: #{event.to_hash.inspect}\n" - message << "Message: #{event["message"]}" - end + case file + when LogStash::ShutdownEvent + @logger.debug("S3: upload worker is shutting down gracefuly") + @upload_queue.enq(LogStash::ShutdownEvent) + else + @logger.debug("S3: upload working is uploading a new file", :filename => File.basename(file)) + move_file_to_bucket(file) + end + end -end + private + def next_page + @page_counter += 1 + end -# Enjoy it, by Bistic:) + private + def reset_page_counter + @page_counter = 0 + end + + private + def delete_on_bucket(filename) + bucket = @s3.buckets[@bucket] + + remote_filename = "#{@prefix}#{File.basename(filename)}" + + @logger.debug("S3: delete file from bucket", :remote_filename => remote_filename, :bucket => @bucket) + + begin + # prepare for write the file + object = bucket.objects[remote_filename] + object.delete + rescue AWS::Errors::Base => e + @logger.error("S3: AWS error", :error => e) + raise LogStash::ConfigurationError, "AWS Configuration Error" + end + end + + private + def move_file_to_bucket_async(file) + @logger.debug("S3: Sending the file to the upload queue.", :filename => File.basename(file)) + @upload_queue.enq(file) + end +end