lib/fluent/plugin/in_cloudwatch_logs.rb in fluent-plugin-cloudwatch-logs-0.6.0 vs lib/fluent/plugin/in_cloudwatch_logs.rb in fluent-plugin-cloudwatch-logs-0.6.1
- old
+ new
@@ -1,5 +1,6 @@
+require 'date'
require 'fluent/plugin/input'
require 'fluent/plugin/parser'
require 'yajl'
module Fluent::Plugin
@@ -15,16 +16,17 @@
config_param :aws_sts_session_name, :string, default: 'fluentd'
config_param :region, :string, :default => nil
config_param :endpoint, :string, :default => nil
config_param :tag, :string
config_param :log_group_name, :string
- config_param :log_stream_name, :string
+ config_param :log_stream_name, :string, :default => nil
config_param :use_log_stream_name_prefix, :bool, default: false
config_param :state_file, :string
config_param :fetch_interval, :time, default: 60
config_param :http_proxy, :string, default: nil
config_param :json_handler, :enum, list: [:yajl, :json], :default => :yajl
+ config_param :use_todays_log_stream, :bool, default: false
config_section :parse do
config_set_default :@type, 'none'
end
@@ -103,12 +105,14 @@
until @finished
if Time.now > @next_fetch_time
@next_fetch_time += @fetch_interval
- if @use_log_stream_name_prefix
- log_streams = describe_log_streams
+ if @use_log_stream_name_prefix || @use_todays_log_stream
+ log_stream_name_prefix = @use_todays_log_stream ? get_todays_date : @log_stream_name
+ log_streams = describe_log_streams(log_stream_name_prefix)
+ log_streams.concat(describe_log_streams(get_yesterdays_date)) if @use_todays_log_stream
log_streams.each do |log_stream|
log_stream_name = log_stream.log_stream_name
events = get_events(log_stream_name)
events.each do |event|
emit(log_stream_name, event)
@@ -140,31 +144,46 @@
def get_events(log_stream_name)
request = {
log_group_name: @log_group_name,
log_stream_name: log_stream_name
}
- request[:next_token] = next_token(log_stream_name) if next_token(log_stream_name)
+ log_next_token = next_token(log_stream_name)
+ request[:next_token] = log_next_token if !log_next_token.nil? && !log_next_token.empty?
response = @logs.get_log_events(request)
- store_next_token(response.next_forward_token, log_stream_name)
+ if valid_next_token(log_next_token, response.next_forward_token)
+ store_next_token(response.next_forward_token, log_stream_name)
+ end
response.events
end
- def describe_log_streams(log_streams = nil, next_token = nil)
+ def describe_log_streams(log_stream_name_prefix, log_streams = nil, next_token = nil)
request = {
log_group_name: @log_group_name
}
request[:next_token] = next_token if next_token
- request[:log_stream_name_prefix] = @log_stream_name
+ request[:log_stream_name_prefix] = log_stream_name_prefix
response = @logs.describe_log_streams(request)
if log_streams
log_streams.concat(response.log_streams)
else
log_streams = response.log_streams
end
if response.next_token
- log_streams = describe_log_streams(log_streams, response.next_token)
+ log_streams = describe_log_streams(log_stream_name_prefix, log_streams, response.next_token)
end
log_streams
+ end
+
+ def valid_next_token(prev_token, next_token)
+ return prev_token != next_token.chomp && !next_token.nil?
+ end
+
+ def get_todays_date
+ return Date.today.strftime("%Y/%m/%d")
+ end
+
+ def get_yesterdays_date
+ return (Date.today - 1).strftime("%Y/%m/%d")
end
end
end