lib/logstash/inputs/cloudwatch_logs.rb in logstash-input-cloudwatch_logs-0.10.3 vs lib/logstash/inputs/cloudwatch_logs.rb in logstash-input-cloudwatch_logs-1.0.0.pre

- old
+ new

@@ -2,15 +2,14 @@ require "logstash/inputs/base" require "logstash/namespace" require "logstash/plugin_mixins/aws_config" require "logstash/timestamp" require "time" -require "tmpdir" require "stud/interval" -require "stud/temporary" require "aws-sdk" -require "logstash/inputs/cloudwatch/patch" +require "logstash/inputs/cloudwatch_logs/patch" +require "fileutils" Aws.eager_autoload! # Stream events from CloudWatch Logs streams. # @@ -26,13 +25,13 @@ config_name "cloudwatch_logs" default :codec, "plain" - # Log group to pull logs from for this plugin. Will pull in all - # streams inside of this log group. - config :log_group, :validate => :string, :required => true + # Log group(s) to use as an input. If `log_group_prefix` is set + # to `true`, then each member of the array is treated as a prefix + config :log_group, :validate => :array, :required => true # Where to write the since database (keeps track of the date # the last handled log stream was updated). The default will write # sincedb files to some path matching "$HOME/.sincedb*" # Should be a path with filename not just a directory. @@ -48,207 +47,169 @@ # def register public def register require "digest/md5" + @logger.trace("Registering cloudwatch_logs input", :log_group => @log_group) + settings = defined?(LogStash::SETTINGS) ? LogStash::SETTINGS : nil + @sincedb = {} - @logger.info("Registering cloudwatch_logs input", :log_group => @log_group) - Aws::ConfigService::Client.new(aws_options_hash) - @cloudwatch = Aws::CloudWatchLogs::Client.new(aws_options_hash) + + if @sincedb_path.nil? + if settings + datapath = File.join(settings.get_value("path.data"), "plugins", "inputs", "cloudwatch_logs") + # Ensure that the filepath exists before writing, since it's deeply nested. + FileUtils::mkdir_p datapath + @sincedb_path = File.join(datapath, ".sincedb_" + Digest::MD5.hexdigest(@log_group.join(","))) + end + end + + # This section is going to be deprecated eventually, as path.data will be + # the default, not an environment variable (SINCEDB_DIR or HOME) + if @sincedb_path.nil? # If it is _still_ nil... + if ENV["SINCEDB_DIR"].nil? && ENV["HOME"].nil? + @logger.error("No SINCEDB_DIR or HOME environment variable set, I don't know where " \ + "to keep track of the files I'm watching. Either set " \ + "HOME or SINCEDB_DIR in your environment, or set sincedb_path in " \ + "in your Logstash config for the file input with " \ + "path '#{@path.inspect}'") + raise + end + + #pick SINCEDB_DIR if available, otherwise use HOME + sincedb_dir = ENV["SINCEDB_DIR"] || ENV["HOME"] + + @sincedb_path = File.join(sincedb_dir, ".sincedb_" + Digest::MD5.hexdigest(@log_group.join(","))) + + @logger.info("No sincedb_path set, generating one based on the log_group setting", + :sincedb_path => @sincedb_path, :log_group => @log_group) + end + end #def register # def run public def run(queue) - while !stop? - process_group(queue) - Stud.stoppable_sleep(@interval) + @queue = queue + _sincedb_open + + Stud.interval(@interval) do + groups = find_log_groups + + groups.each do |group| + @logger.debug("calling process_group on #{group}") + process_group(group) + end # groups.each end + end # def run - # def list_new_streams public - def list_new_streams() + def find_log_groups if @log_group_prefix - log_groups = @cloudwatch.describe_log_groups(log_group_name_prefix: @log_group) - groups = log_groups.log_groups.map {|n| n.log_group_name} - while log_groups.next_token - log_groups = @cloudwatch.describe_log_groups(log_group_name_prefix: @log_group, next_token: log_groups.next_token) - groups += log_groups.log_groups.map {|n| n.log_group_name} + @logger.debug("log_group prefix is enabled, searching for log groups") + groups = [] + next_token = nil + @log_group.each do |group| + loop do + log_groups = @cloudwatch.describe_log_groups(log_group_name_prefix: group, next_token: next_token) + groups += log_groups.log_groups.map {|n| n.log_group_name} + next_token = log_groups.next_token + @logger.debug("found #{log_groups.log_groups.length} log groups matching prefix #{group}") + break if next_token.nil? + end end else - groups = [@log_group] + @logger.debug("log_group_prefix not enabled") + groups = @log_group end - objects = [] - for log_group in groups - objects.concat(list_new_streams_for_log_group(log_group)) - end - objects - end + groups + end # def find_log_groups - # def list_new_streams_for_log_group - public - def list_new_streams_for_log_group(log_group, token = nil, objects = [], stepback=0) - params = { - :log_group_name => log_group, - :order_by => "LastEventTime", - :descending => false - } + private + def process_group(group) + next_token = nil + loop do + if !@sincedb.member?(group) + @sincedb[group] = 0 + end + params = { + :log_group_name => group, + :start_time => @sincedb[group], + :limit => 10, + :interleaved => true, + :next_token => next_token + } + resp = @cloudwatch.filter_log_events(params) - @logger.debug("CloudWatch Logs for log_group #{log_group}") + resp.events.each do |event| + process_log(event, group) + end - if token != nil - params[:next_token] = token - end + _sincedb_write - begin - streams = @cloudwatch.describe_log_streams(params) - rescue Aws::CloudWatchLogs::Errors::ThrottlingException - @logger.debug("CloudWatch Logs stepping back ", :stepback => 2 ** stepback * 60) - sleep(2 ** stepback * 60) - stepback += 1 - @logger.debug("CloudWatch Logs repeating list_new_streams again with token", :token => token) - return list_new_streams_for_log_group(log_group, token=token, objects=objects, stepback=stepback) + next_token = resp.next_token + break if next_token.nil? end + end #def process_group - objects.push(*streams.log_streams) - if streams.next_token == nil - @logger.debug("CloudWatch Logs hit end of tokens for streams") - objects - else - @logger.debug("CloudWatch Logs calling list_new_streams again on token", :token => streams.next_token) - list_new_streams_for_log_group(log_group, streams.next_token, objects) - end - end # def list_new_streams_for_log_group - # def process_log private - def process_log(queue, log, stream) + def process_log(log, group) @codec.decode(log.message.to_str) do |event| event.set("@timestamp", parse_time(log.timestamp)) - event.set("[cloudwatch][ingestion_time]", parse_time(log.ingestion_time)) - event.set("[cloudwatch][log_group]", stream.arn.split(/:/)[6]) - event.set("[cloudwatch][log_stream]", stream.log_stream_name) + event.set("[cloudwatch_logs][ingestion_time]", parse_time(log.ingestion_time)) + event.set("[cloudwatch_logs][log_group]", group) + event.set("[cloudwatch_logs][log_stream]", log.log_stream_name) + event.set("[cloudwatch_logs][event_id]", log.event_id) decorate(event) - queue << event + @queue << event + @sincedb[group] = log.timestamp + 1 end - end - # def process_log + end # def process_log # def parse_time private def parse_time(data) LogStash::Timestamp.at(data.to_i / 1000, (data.to_i % 1000) * 1000) end # def parse_time - # def process_group - public - def process_group(queue) - objects = list_new_streams - - last_read = sincedb.read - current_window = DateTime.now.strftime('%Q') - - if last_read < 0 - last_read = 1 - end - - objects.each do |stream| - if stream.last_ingestion_time && stream.last_ingestion_time > last_read - process_log_stream(queue, stream, last_read, current_window) + private + def _sincedb_open + begin + File.open(@sincedb_path) do |db| + @logger.debug? && @logger.debug("_sincedb_open: reading from #{@sincedb_path}") + db.each do |line| + group, pos = line.split(" ", 2) + @logger.debug? && @logger.debug("_sincedb_open: setting #{group} to #{pos.to_i}") + @sincedb[group] = pos.to_i + end end + rescue + #No existing sincedb to load + @logger.debug? && @logger.debug("_sincedb_open: error: #{@sincedb_path}: #{$!}") end + end # def _sincedb_open - sincedb.write(current_window) - end # def process_group - - # def process_log_stream private - def process_log_stream(queue, stream, last_read, current_window, token = nil, stepback=0) - @logger.debug("CloudWatch Logs processing stream", - :log_stream => stream.log_stream_name, - :log_group => stream.arn.split(":")[6], - :lastRead => last_read, - :currentWindow => current_window, - :token => token - ) - - params = { - :log_group_name => stream.arn.split(":")[6], - :log_stream_name => stream.log_stream_name, - :start_from_head => true - } - - if token != nil - params[:next_token] = token - end - - + def _sincedb_write begin - logs = @cloudwatch.get_log_events(params) - rescue Aws::CloudWatchLogs::Errors::ThrottlingException - @logger.debug("CloudWatch Logs stepping back ", :stepback => 2 ** stepback * 60) - sleep(2 ** stepback * 60) - stepback += 1 - @logger.debug("CloudWatch Logs repeating process_log_stream again with token", :token => token) - return process_log_stream(queue, stream, last_read, current_window, token, stepback) + IO.write(@sincedb_path, serialize_sincedb, 0) + rescue Errno::EACCES + # probably no file handles free + # maybe it will work next time + @logger.debug? && @logger.debug("_sincedb_write: error: #{@sincedb_path}: #{$!}") end + end # def _sincedb_write - logs.events.each do |log| - if log.ingestion_time > last_read - process_log(queue, log, stream) - end - end - # if there are more pages, continue - if logs.events.count != 0 && logs.next_forward_token != nil - process_log_stream(queue, stream, last_read, current_window, logs.next_forward_token) - end - end # def process_log_stream - private - def sincedb - @sincedb ||= if @sincedb_path.nil? - @logger.info("Using default generated file for the sincedb", :filename => sincedb_file) - SinceDB::File.new(sincedb_file) - else - @logger.info("Using the provided sincedb_path", - :sincedb_path => @sincedb_path) - SinceDB::File.new(@sincedb_path) - end - end - - private - def sincedb_file - File.join(ENV["HOME"], ".sincedb_" + Digest::MD5.hexdigest("#{@log_group}")) - end - - module SinceDB - class File - def initialize(file) - @sincedb_path = file - end - - def newer?(date) - date > read - end - - def read - if ::File.exists?(@sincedb_path) - since = ::File.read(@sincedb_path).chomp.strip.to_i - else - since = 1 - end - return since - end - - def write(since = nil) - since = DateTime.now.strftime('%Q') if since.nil? - ::File.open(@sincedb_path, 'w') { |file| file.write(since.to_s) } - end - end + def serialize_sincedb + @sincedb.map do |group, pos| + [group, pos].join(" ") + end.join("\n") + "\n" end end # class LogStash::Inputs::CloudWatch_Logs