lib/logstash/inputs/cloudwatch_logs.rb in logstash-input-cloudwatch_logs-0.9.4 vs lib/logstash/inputs/cloudwatch_logs.rb in logstash-input-cloudwatch_logs-0.10.0

- old
+ new

@@ -36,10 +36,14 @@ # Interval to wait between to check the file list again after a run is finished. # Value is in seconds. config :interval, :validate => :number, :default => 60 + # Decide if log_group is a prefix or an absolute name + config :log_group_prefix, :validate => :boolean, :default => false + + # def register public def register require "digest/md5" require "aws-sdk" @@ -60,43 +64,68 @@ end end # def run # def list_new_streams public - def list_new_streams(token = nil, objects = []) + def list_new_streams() + if @log_group_prefix + log_groups = @cloudwatch.describe_log_groups(log_group_name_prefix: @log_group) + groups = log_groups.log_groups.map {|n| n.log_group_name} + else + groups = [@log_group] + end + objects = [] + for log_group in groups + objects.concat(list_new_streams_for_log_group(log_group)) + end + objects + end + + # def list_new_streams_for_log_group + public + def list_new_streams_for_log_group(log_group, token = nil, objects = [], stepback=0) params = { - :log_group_name => @log_group, - :order_by => "LastEventTime", - :descending => false + :log_group_name => log_group, + :order_by => "LastEventTime", + :descending => false } + @logger.debug("CloudWatch Logs for log_group #{log_group}") + if token != nil params[:next_token] = token end - streams = @cloudwatch.describe_log_streams(params) + begin + streams = @cloudwatch.describe_log_streams(params) + rescue Aws::CloudWatchLogs::Errors::ThrottlingException + @logger.debug("CloudWatch Logs stepping back ", :stepback => 2 ** stepback * 60) + sleep(2 ** stepback * 60) + stepback += 1 + @logger.debug("CloudWatch Logs repeating list_new_streams again with token", :token => token) + return list_new_streams_for_log_group(log_group, token=token, objects=objects, stepback=stepback) + end objects.push(*streams.log_streams) if streams.next_token == nil @logger.debug("CloudWatch Logs hit end of tokens for streams") objects else @logger.debug("CloudWatch Logs calling list_new_streams again on token", :token => streams.next_token) - list_new_streams(streams.next_token, objects) + list_new_streams_for_log_group(log_group, streams.next_token, objects) end + end # def list_new_streams_for_log_group - end # def list_new_streams - # def process_log private def process_log(queue, log, stream) @codec.decode(log.message.to_str) do |event| event.set("@timestamp", parse_time(log.timestamp)) - event.set("[cloudwatch][ingestion_time]", parse_time(log.ingestion_time)) - event.set("[cloudwatch][log_group]", @log_group) - event.set("[cloudwatch][log_stream]", stream.log_stream_name) + event["[cloudwatch][ingestion_time]"] = parse_time(log.ingestion_time) + event["[cloudwatch][log_group]"] = stream.arn.split(/:/)[6] + event["[cloudwatch][log_stream]"] = stream.log_stream_name decorate(event) queue << event end end @@ -129,29 +158,38 @@ sincedb.write(current_window) end # def process_group # def process_log_stream private - def process_log_stream(queue, stream, last_read, current_window, token = nil) + def process_log_stream(queue, stream, last_read, current_window, token = nil, stepback=0) @logger.debug("CloudWatch Logs processing stream", :log_stream => stream.log_stream_name, - :log_group => @log_group, + :log_group => stream.arn.split(":")[6], :lastRead => last_read, :currentWindow => current_window, :token => token ) params = { - :log_group_name => @log_group, + :log_group_name => stream.arn.split(":")[6], :log_stream_name => stream.log_stream_name, :start_from_head => true } if token != nil params[:next_token] = token end - logs = @cloudwatch.get_log_events(params) + + begin + logs = @cloudwatch.get_log_events(params) + rescue Aws::CloudWatchLogs::Errors::ThrottlingException + @logger.debug("CloudWatch Logs stepping back ", :stepback => 2 ** stepback * 60) + sleep(2 ** stepback * 60) + stepback += 1 + @logger.debug("CloudWatch Logs repeating process_log_stream again with token", :token => token) + return process_log_stream(queue, stream, last_read, current_window, token, stepback) + end logs.events.each do |log| if log.ingestion_time > last_read process_log(queue, log, stream) end