lib/fluent/plugin/in_cloudwatch_logs.rb in fluent-plugin-cloudwatch-logs-0.9.4 vs lib/fluent/plugin/in_cloudwatch_logs.rb in fluent-plugin-cloudwatch-logs-0.9.5

- old
+ new

@@ -168,50 +168,58 @@ end end end def get_events(log_stream_name) - request = { - log_group_name: @log_group_name, - log_stream_name: log_stream_name - } - request.merge!(start_time: @start_time) if @start_time - request.merge!(end_time: @end_time) if @end_time - log_next_token = next_token(log_stream_name) - request[:next_token] = log_next_token if !log_next_token.nil? && !log_next_token.empty? - response = @logs.get_log_events(request) - if valid_next_token(log_next_token, response.next_forward_token) - store_next_token(response.next_forward_token, log_stream_name) + throttling_handler('get_log_events') do + request = { + log_group_name: @log_group_name, + log_stream_name: log_stream_name + } + request.merge!(start_time: @start_time) if @start_time + request.merge!(end_time: @end_time) if @end_time + log_next_token = next_token(log_stream_name) + request[:next_token] = log_next_token if !log_next_token.nil? && !log_next_token.empty? + response = @logs.get_log_events(request) + if valid_next_token(log_next_token, response.next_forward_token) + store_next_token(response.next_forward_token, log_stream_name) + end + + response.events end + end - response.events + def describe_log_streams(log_stream_name_prefix, log_streams = nil, next_token = nil) + throttling_handler('describe_log_streams') do + request = { + log_group_name: @log_group_name + } + request[:next_token] = next_token if next_token + request[:log_stream_name_prefix] = log_stream_name_prefix if log_stream_name_prefix + response = @logs.describe_log_streams(request) + if log_streams + log_streams.concat(response.log_streams) + else + log_streams = response.log_streams + end + if response.next_token + log_streams = describe_log_streams(log_stream_name_prefix, log_streams, response.next_token) + end + log_streams + end + end + + def throttling_handler(method_name) + yield rescue Aws::CloudWatchLogs::Errors::ThrottlingException => err if throttling_retry_seconds - log.warn "ThrottlingException in get_log_events (#{log_stream_name}). Waiting #{throttling_retry_seconds} seconds to retry." + log.warn "ThrottlingException #{method_name}. Waiting #{throttling_retry_seconds} seconds to retry." sleep throttling_retry_seconds - get_events(log_stream_name) + throttling_handler(method_name) { yield } else raise err end - end - - def describe_log_streams(log_stream_name_prefix, log_streams = nil, next_token = nil) - request = { - log_group_name: @log_group_name - } - request[:next_token] = next_token if next_token - request[:log_stream_name_prefix] = log_stream_name_prefix if log_stream_name_prefix - response = @logs.describe_log_streams(request) - if log_streams - log_streams.concat(response.log_streams) - else - log_streams = response.log_streams - end - if response.next_token - log_streams = describe_log_streams(log_stream_name_prefix, log_streams, response.next_token) - end - log_streams end def valid_next_token(prev_token, next_token) next_token && prev_token != next_token.chomp end