lib/logstash/inputs/cloudwatch_logs.rb in logstash-input-cloudwatch_logs-1.0.0 vs lib/logstash/inputs/cloudwatch_logs.rb in logstash-input-cloudwatch_logs-1.0.1
- old
+ new
@@ -108,20 +108,25 @@
# def run
public
def run(queue)
@queue = queue
+ @priority = []
_sincedb_open
determine_start_position(find_log_groups)
while !stop?
- groups = find_log_groups
+ begin
+ groups = find_log_groups
- groups.each do |group|
- @logger.debug("calling process_group on #{group}")
- process_group(group)
- end # groups.each
+ groups.each do |group|
+ @logger.debug("calling process_group on #{group}")
+ process_group(group)
+ end # groups.each
+ rescue Aws::CloudWatchLogs::Errors::ThrottlingException
+ @logger.debug("reached rate limit")
+ end
Stud.stoppable_sleep(@interval) { stop? }
end
end # def run
@@ -142,13 +147,19 @@
end
else
@logger.debug("log_group_prefix not enabled")
groups = @log_group
end
- groups
+ # Move the most recent groups to the end
+ groups.sort{|a,b| priority_of(a) <=> priority_of(b) }
end # def find_log_groups
+ private
+ def priority_of(group)
+ @priority.index(group) || -1
+ end
+
public
def determine_start_position(groups)
groups.each do |group|
if !@sincedb.member?(group)
case @start_position
@@ -187,9 +198,11 @@
_sincedb_write
next_token = resp.next_token
break if next_token.nil?
end
+ @priority.delete(group)
+ @priority << group
end #def process_group
# def process_log
private
def process_log(log, group)