lib/fluent/plugin/in_cloudwatch_logs.rb in fluent-plugin-cloudwatch-logs-0.9.5 vs lib/fluent/plugin/in_cloudwatch_logs.rb in fluent-plugin-cloudwatch-logs-0.10.0
- old
+ new
@@ -6,12 +6,14 @@
module Fluent::Plugin
class CloudwatchLogsInput < Input
Fluent::Plugin.register_input('cloudwatch_logs', self)
- helpers :parser, :thread, :compat_parameters
+ helpers :parser, :thread, :compat_parameters, :storage
+ DEFAULT_STORAGE_TYPE = 'local'
+
config_param :aws_key_id, :string, default: nil, secret: true
config_param :aws_sec_key, :string, default: nil, secret: true
config_param :aws_use_sts, :bool, default: false
config_param :aws_sts_role_arn, :string, default: nil
config_param :aws_sts_session_name, :string, default: 'fluentd'
@@ -19,25 +21,33 @@
config_param :endpoint, :string, default: nil
config_param :tag, :string
config_param :log_group_name, :string
config_param :log_stream_name, :string, default: nil
config_param :use_log_stream_name_prefix, :bool, default: false
- config_param :state_file, :string
+ config_param :state_file, :string, default: nil,
+ deprecated: "Use <stroage> instead."
config_param :fetch_interval, :time, default: 60
config_param :http_proxy, :string, default: nil
config_param :json_handler, :enum, list: [:yajl, :json], default: :yajl
config_param :use_todays_log_stream, :bool, default: false
config_param :use_aws_timestamp, :bool, default: false
config_param :start_time, :string, default: nil
config_param :end_time, :string, default: nil
config_param :time_range_format, :string, default: "%Y-%m-%d %H:%M:%S"
config_param :throttling_retry_seconds, :time, default: nil
+ config_param :include_metadata, :bool, default: false
config_section :parse do
config_set_default :@type, 'none'
end
+ config_section :storage do
+ config_set_default :usage, 'store_next_tokens'
+ config_set_default :@type, DEFAULT_STORAGE_TYPE
+ config_set_default :persistent, false
+ end
+
def initialize
super
@parser = nil
require 'aws-sdk-cloudwatchlogs'
@@ -51,10 +61,11 @@
@start_time = (Time.strptime(@start_time, @time_range_format).to_f * 1000).floor if @start_time
@end_time = (Time.strptime(@end_time, @time_range_format).to_f * 1000).floor if @end_time
if @start_time && @end_time && (@end_time < @start_time)
raise Fluent::ConfigError, "end_time(#{@end_time}) should be greater than start_time(#{@start_time})."
end
+ @next_token_storage = storage_create(usage: 'store_next_tokens', conf: config, default_type: DEFAULT_STORAGE_TYPE)
end
def start
super
options = {}
@@ -97,24 +108,32 @@
elsif parser_config = conf.elements('parse').first
@parser = parser_create(conf: parser_config)
end
end
- def state_file_for(log_stream_name)
- return "#{@state_file}_#{log_stream_name.gsub(File::SEPARATOR, '-')}" if log_stream_name
- return @state_file
+ def state_key_for(log_stream_name)
+ if log_stream_name
+ "#{@state_file}_#{log_stream_name.gsub(File::SEPARATOR, '-')}"
+ else
+ @state_file
+ end
end
+ def migrate_state_file_to_storage(log_stream_name)
+ @next_token_storage.put(:"#{state_key_for(log_stream_name)}", File.read(state_key_for(log_stream_name)).chomp)
+ File.delete(state_key_for(log_stream_name))
+ end
+
def next_token(log_stream_name)
- return nil unless File.exist?(state_file_for(log_stream_name))
- File.read(state_file_for(log_stream_name)).chomp
+ if @next_token_storage.persistent && File.exist?(state_key_for(log_stream_name))
+ migrate_state_file_to_storage(log_stream_name)
+ end
+ @next_token_storage.get(:"#{state_key_for(log_stream_name)}")
end
def store_next_token(token, log_stream_name = nil)
- File.open(state_file_for(log_stream_name), 'w') do |f|
- f.write token
- end
+ @next_token_storage.put(:"#{state_key_for(log_stream_name)}", token)
end
def run
@next_fetch_time = Time.now
@@ -128,40 +147,62 @@
log_streams = describe_log_streams(log_stream_name_prefix)
log_streams.concat(describe_log_streams(get_yesterdays_date)) if @use_todays_log_stream
log_streams.each do |log_stream|
log_stream_name = log_stream.log_stream_name
events = get_events(log_stream_name)
+ metadata = if @include_metadata
+ {
+ "log_stream_name" => log_stream_name,
+ "log_group_name" => @log_group_name
+ }
+ else
+ {}
+ end
events.each do |event|
- emit(log_stream_name, event)
+ emit(log_stream_name, event, metadata)
end
end
rescue Aws::CloudWatchLogs::Errors::ResourceNotFoundException
log.warn "'#{@log_stream_name}' prefixed log stream(s) are not found"
next
end
else
events = get_events(@log_stream_name)
+ metadata = if @include_metadata
+ {
+ "log_stream_name" => @log_stream_name,
+ "log_group_name" => @log_group_name
+ }
+ else
+ {}
+ end
events.each do |event|
- emit(log_stream_name, event)
+ emit(log_stream_name, event, metadata)
end
end
end
sleep 1
end
end
- def emit(stream, event)
+ def emit(stream, event, metadata)
if @parser
@parser.parse(event.message) {|time,record|
if @use_aws_timestamp
time = (event.timestamp / 1000).floor
end
+ unless metadata.empty?
+ record.merge!("metadata" => metadata)
+ end
router.emit(@tag, time, record)
}
else
time = (event.timestamp / 1000).floor
begin
record = @json_handler.load(event.message)
+ unless metadata.empty?
+ record.merge!("metadata" => metadata)
+ end
router.emit(@tag, time, record)
rescue JSON::ParserError, Yajl::ParseError => error # Catch parser errors
log.error "Invalid JSON encountered while parsing event.message"
router.emit_error_event(@tag, time, { message: event.message }, error)
end