lib/fluent/plugin/in_cloudwatch_logs.rb in fluent-plugin-cloudwatch-logs-0.6.0 vs lib/fluent/plugin/in_cloudwatch_logs.rb in fluent-plugin-cloudwatch-logs-0.6.1

- old
+ new

@@ -1,5 +1,6 @@ +require 'date' require 'fluent/plugin/input' require 'fluent/plugin/parser' require 'yajl' module Fluent::Plugin @@ -15,16 +16,17 @@ config_param :aws_sts_session_name, :string, default: 'fluentd' config_param :region, :string, :default => nil config_param :endpoint, :string, :default => nil config_param :tag, :string config_param :log_group_name, :string - config_param :log_stream_name, :string + config_param :log_stream_name, :string, :default => nil config_param :use_log_stream_name_prefix, :bool, default: false config_param :state_file, :string config_param :fetch_interval, :time, default: 60 config_param :http_proxy, :string, default: nil config_param :json_handler, :enum, list: [:yajl, :json], :default => :yajl + config_param :use_todays_log_stream, :bool, default: false config_section :parse do config_set_default :@type, 'none' end @@ -103,12 +105,14 @@ until @finished if Time.now > @next_fetch_time @next_fetch_time += @fetch_interval - if @use_log_stream_name_prefix - log_streams = describe_log_streams + if @use_log_stream_name_prefix || @use_todays_log_stream + log_stream_name_prefix = @use_todays_log_stream ? get_todays_date : @log_stream_name + log_streams = describe_log_streams(log_stream_name_prefix) + log_streams.concat(describe_log_streams(get_yesterdays_date)) if @use_todays_log_stream log_streams.each do |log_stream| log_stream_name = log_stream.log_stream_name events = get_events(log_stream_name) events.each do |event| emit(log_stream_name, event) @@ -140,31 +144,46 @@ def get_events(log_stream_name) request = { log_group_name: @log_group_name, log_stream_name: log_stream_name } - request[:next_token] = next_token(log_stream_name) if next_token(log_stream_name) + log_next_token = next_token(log_stream_name) + request[:next_token] = log_next_token if !log_next_token.nil? && !log_next_token.empty? response = @logs.get_log_events(request) - store_next_token(response.next_forward_token, log_stream_name) + if valid_next_token(log_next_token, response.next_forward_token) + store_next_token(response.next_forward_token, log_stream_name) + end response.events end - def describe_log_streams(log_streams = nil, next_token = nil) + def describe_log_streams(log_stream_name_prefix, log_streams = nil, next_token = nil) request = { log_group_name: @log_group_name } request[:next_token] = next_token if next_token - request[:log_stream_name_prefix] = @log_stream_name + request[:log_stream_name_prefix] = log_stream_name_prefix response = @logs.describe_log_streams(request) if log_streams log_streams.concat(response.log_streams) else log_streams = response.log_streams end if response.next_token - log_streams = describe_log_streams(log_streams, response.next_token) + log_streams = describe_log_streams(log_stream_name_prefix, log_streams, response.next_token) end log_streams + end + + def valid_next_token(prev_token, next_token) + return prev_token != next_token.chomp && !next_token.nil? + end + + def get_todays_date + return Date.today.strftime("%Y/%m/%d") + end + + def get_yesterdays_date + return (Date.today - 1).strftime("%Y/%m/%d") end end end