lib/logstash/inputs/cloudwatch_logs.rb in logstash-input-cloudwatch_logs-0.9.4 vs lib/logstash/inputs/cloudwatch_logs.rb in logstash-input-cloudwatch_logs-0.10.0
- old
+ new
@@ -36,10 +36,14 @@
# Interval to wait between to check the file list again after a run is finished.
# Value is in seconds.
config :interval, :validate => :number, :default => 60
+ # Decide if log_group is a prefix or an absolute name
+ config :log_group_prefix, :validate => :boolean, :default => false
+
+
# def register
public
def register
require "digest/md5"
require "aws-sdk"
@@ -60,43 +64,68 @@
end
end # def run
# def list_new_streams
public
- def list_new_streams(token = nil, objects = [])
+ def list_new_streams()
+ if @log_group_prefix
+ log_groups = @cloudwatch.describe_log_groups(log_group_name_prefix: @log_group)
+ groups = log_groups.log_groups.map {|n| n.log_group_name}
+ else
+ groups = [@log_group]
+ end
+ objects = []
+ for log_group in groups
+ objects.concat(list_new_streams_for_log_group(log_group))
+ end
+ objects
+ end
+
+ # def list_new_streams_for_log_group
+ public
+ def list_new_streams_for_log_group(log_group, token = nil, objects = [], stepback=0)
params = {
- :log_group_name => @log_group,
- :order_by => "LastEventTime",
- :descending => false
+ :log_group_name => log_group,
+ :order_by => "LastEventTime",
+ :descending => false
}
+ @logger.debug("CloudWatch Logs for log_group #{log_group}")
+
if token != nil
params[:next_token] = token
end
- streams = @cloudwatch.describe_log_streams(params)
+ begin
+ streams = @cloudwatch.describe_log_streams(params)
+ rescue Aws::CloudWatchLogs::Errors::ThrottlingException
+ @logger.debug("CloudWatch Logs stepping back ", :stepback => 2 ** stepback * 60)
+ sleep(2 ** stepback * 60)
+ stepback += 1
+ @logger.debug("CloudWatch Logs repeating list_new_streams again with token", :token => token)
+ return list_new_streams_for_log_group(log_group, token=token, objects=objects, stepback=stepback)
+ end
objects.push(*streams.log_streams)
if streams.next_token == nil
@logger.debug("CloudWatch Logs hit end of tokens for streams")
objects
else
@logger.debug("CloudWatch Logs calling list_new_streams again on token", :token => streams.next_token)
- list_new_streams(streams.next_token, objects)
+ list_new_streams_for_log_group(log_group, streams.next_token, objects)
end
+ end # def list_new_streams_for_log_group
- end # def list_new_streams
-
# def process_log
private
def process_log(queue, log, stream)
@codec.decode(log.message.to_str) do |event|
event.set("@timestamp", parse_time(log.timestamp))
- event.set("[cloudwatch][ingestion_time]", parse_time(log.ingestion_time))
- event.set("[cloudwatch][log_group]", @log_group)
- event.set("[cloudwatch][log_stream]", stream.log_stream_name)
+ event["[cloudwatch][ingestion_time]"] = parse_time(log.ingestion_time)
+ event["[cloudwatch][log_group]"] = stream.arn.split(/:/)[6]
+ event["[cloudwatch][log_stream]"] = stream.log_stream_name
decorate(event)
queue << event
end
end
@@ -129,29 +158,38 @@
sincedb.write(current_window)
end # def process_group
# def process_log_stream
private
- def process_log_stream(queue, stream, last_read, current_window, token = nil)
+ def process_log_stream(queue, stream, last_read, current_window, token = nil, stepback=0)
@logger.debug("CloudWatch Logs processing stream",
:log_stream => stream.log_stream_name,
- :log_group => @log_group,
+ :log_group => stream.arn.split(":")[6],
:lastRead => last_read,
:currentWindow => current_window,
:token => token
)
params = {
- :log_group_name => @log_group,
+ :log_group_name => stream.arn.split(":")[6],
:log_stream_name => stream.log_stream_name,
:start_from_head => true
}
if token != nil
params[:next_token] = token
end
- logs = @cloudwatch.get_log_events(params)
+
+ begin
+ logs = @cloudwatch.get_log_events(params)
+ rescue Aws::CloudWatchLogs::Errors::ThrottlingException
+ @logger.debug("CloudWatch Logs stepping back ", :stepback => 2 ** stepback * 60)
+ sleep(2 ** stepback * 60)
+ stepback += 1
+ @logger.debug("CloudWatch Logs repeating process_log_stream again with token", :token => token)
+ return process_log_stream(queue, stream, last_read, current_window, token, stepback)
+ end
logs.events.each do |log|
if log.ingestion_time > last_read
process_log(queue, log, stream)
end