lib/fluent/plugin/in_cloudwatch_logs.rb in fluent-plugin-cloudwatch-logs-0.9.4 vs lib/fluent/plugin/in_cloudwatch_logs.rb in fluent-plugin-cloudwatch-logs-0.9.5
- old
+ new
@@ -168,50 +168,58 @@
end
end
end
def get_events(log_stream_name)
- request = {
- log_group_name: @log_group_name,
- log_stream_name: log_stream_name
- }
- request.merge!(start_time: @start_time) if @start_time
- request.merge!(end_time: @end_time) if @end_time
- log_next_token = next_token(log_stream_name)
- request[:next_token] = log_next_token if !log_next_token.nil? && !log_next_token.empty?
- response = @logs.get_log_events(request)
- if valid_next_token(log_next_token, response.next_forward_token)
- store_next_token(response.next_forward_token, log_stream_name)
+ throttling_handler('get_log_events') do
+ request = {
+ log_group_name: @log_group_name,
+ log_stream_name: log_stream_name
+ }
+ request.merge!(start_time: @start_time) if @start_time
+ request.merge!(end_time: @end_time) if @end_time
+ log_next_token = next_token(log_stream_name)
+ request[:next_token] = log_next_token if !log_next_token.nil? && !log_next_token.empty?
+ response = @logs.get_log_events(request)
+ if valid_next_token(log_next_token, response.next_forward_token)
+ store_next_token(response.next_forward_token, log_stream_name)
+ end
+
+ response.events
end
+ end
- response.events
+ def describe_log_streams(log_stream_name_prefix, log_streams = nil, next_token = nil)
+ throttling_handler('describe_log_streams') do
+ request = {
+ log_group_name: @log_group_name
+ }
+ request[:next_token] = next_token if next_token
+ request[:log_stream_name_prefix] = log_stream_name_prefix if log_stream_name_prefix
+ response = @logs.describe_log_streams(request)
+ if log_streams
+ log_streams.concat(response.log_streams)
+ else
+ log_streams = response.log_streams
+ end
+ if response.next_token
+ log_streams = describe_log_streams(log_stream_name_prefix, log_streams, response.next_token)
+ end
+ log_streams
+ end
+ end
+
+ def throttling_handler(method_name)
+ yield
rescue Aws::CloudWatchLogs::Errors::ThrottlingException => err
if throttling_retry_seconds
- log.warn "ThrottlingException in get_log_events (#{log_stream_name}). Waiting #{throttling_retry_seconds} seconds to retry."
+ log.warn "ThrottlingException #{method_name}. Waiting #{throttling_retry_seconds} seconds to retry."
sleep throttling_retry_seconds
- get_events(log_stream_name)
+ throttling_handler(method_name) { yield }
else
raise err
end
- end
-
- def describe_log_streams(log_stream_name_prefix, log_streams = nil, next_token = nil)
- request = {
- log_group_name: @log_group_name
- }
- request[:next_token] = next_token if next_token
- request[:log_stream_name_prefix] = log_stream_name_prefix if log_stream_name_prefix
- response = @logs.describe_log_streams(request)
- if log_streams
- log_streams.concat(response.log_streams)
- else
- log_streams = response.log_streams
- end
- if response.next_token
- log_streams = describe_log_streams(log_stream_name_prefix, log_streams, response.next_token)
- end
- log_streams
end
def valid_next_token(prev_token, next_token)
next_token && prev_token != next_token.chomp
end