lib/fluent/plugin/in_cloudwatch_logs.rb in fluent-plugin-cloudwatch-logs-0.9.5 vs lib/fluent/plugin/in_cloudwatch_logs.rb in fluent-plugin-cloudwatch-logs-0.10.0

- old
+ new

@@ -6,12 +6,14 @@ module Fluent::Plugin class CloudwatchLogsInput < Input Fluent::Plugin.register_input('cloudwatch_logs', self) - helpers :parser, :thread, :compat_parameters + helpers :parser, :thread, :compat_parameters, :storage + DEFAULT_STORAGE_TYPE = 'local' + config_param :aws_key_id, :string, default: nil, secret: true config_param :aws_sec_key, :string, default: nil, secret: true config_param :aws_use_sts, :bool, default: false config_param :aws_sts_role_arn, :string, default: nil config_param :aws_sts_session_name, :string, default: 'fluentd' @@ -19,25 +21,33 @@ config_param :endpoint, :string, default: nil config_param :tag, :string config_param :log_group_name, :string config_param :log_stream_name, :string, default: nil config_param :use_log_stream_name_prefix, :bool, default: false - config_param :state_file, :string + config_param :state_file, :string, default: nil, + deprecated: "Use <stroage> instead." config_param :fetch_interval, :time, default: 60 config_param :http_proxy, :string, default: nil config_param :json_handler, :enum, list: [:yajl, :json], default: :yajl config_param :use_todays_log_stream, :bool, default: false config_param :use_aws_timestamp, :bool, default: false config_param :start_time, :string, default: nil config_param :end_time, :string, default: nil config_param :time_range_format, :string, default: "%Y-%m-%d %H:%M:%S" config_param :throttling_retry_seconds, :time, default: nil + config_param :include_metadata, :bool, default: false config_section :parse do config_set_default :@type, 'none' end + config_section :storage do + config_set_default :usage, 'store_next_tokens' + config_set_default :@type, DEFAULT_STORAGE_TYPE + config_set_default :persistent, false + end + def initialize super @parser = nil require 'aws-sdk-cloudwatchlogs' @@ -51,10 +61,11 @@ @start_time = (Time.strptime(@start_time, @time_range_format).to_f * 1000).floor if @start_time @end_time = (Time.strptime(@end_time, @time_range_format).to_f * 1000).floor if @end_time if @start_time && @end_time && (@end_time < @start_time) raise Fluent::ConfigError, "end_time(#{@end_time}) should be greater than start_time(#{@start_time})." end + @next_token_storage = storage_create(usage: 'store_next_tokens', conf: config, default_type: DEFAULT_STORAGE_TYPE) end def start super options = {} @@ -97,24 +108,32 @@ elsif parser_config = conf.elements('parse').first @parser = parser_create(conf: parser_config) end end - def state_file_for(log_stream_name) - return "#{@state_file}_#{log_stream_name.gsub(File::SEPARATOR, '-')}" if log_stream_name - return @state_file + def state_key_for(log_stream_name) + if log_stream_name + "#{@state_file}_#{log_stream_name.gsub(File::SEPARATOR, '-')}" + else + @state_file + end end + def migrate_state_file_to_storage(log_stream_name) + @next_token_storage.put(:"#{state_key_for(log_stream_name)}", File.read(state_key_for(log_stream_name)).chomp) + File.delete(state_key_for(log_stream_name)) + end + def next_token(log_stream_name) - return nil unless File.exist?(state_file_for(log_stream_name)) - File.read(state_file_for(log_stream_name)).chomp + if @next_token_storage.persistent && File.exist?(state_key_for(log_stream_name)) + migrate_state_file_to_storage(log_stream_name) + end + @next_token_storage.get(:"#{state_key_for(log_stream_name)}") end def store_next_token(token, log_stream_name = nil) - File.open(state_file_for(log_stream_name), 'w') do |f| - f.write token - end + @next_token_storage.put(:"#{state_key_for(log_stream_name)}", token) end def run @next_fetch_time = Time.now @@ -128,40 +147,62 @@ log_streams = describe_log_streams(log_stream_name_prefix) log_streams.concat(describe_log_streams(get_yesterdays_date)) if @use_todays_log_stream log_streams.each do |log_stream| log_stream_name = log_stream.log_stream_name events = get_events(log_stream_name) + metadata = if @include_metadata + { + "log_stream_name" => log_stream_name, + "log_group_name" => @log_group_name + } + else + {} + end events.each do |event| - emit(log_stream_name, event) + emit(log_stream_name, event, metadata) end end rescue Aws::CloudWatchLogs::Errors::ResourceNotFoundException log.warn "'#{@log_stream_name}' prefixed log stream(s) are not found" next end else events = get_events(@log_stream_name) + metadata = if @include_metadata + { + "log_stream_name" => @log_stream_name, + "log_group_name" => @log_group_name + } + else + {} + end events.each do |event| - emit(log_stream_name, event) + emit(log_stream_name, event, metadata) end end end sleep 1 end end - def emit(stream, event) + def emit(stream, event, metadata) if @parser @parser.parse(event.message) {|time,record| if @use_aws_timestamp time = (event.timestamp / 1000).floor end + unless metadata.empty? + record.merge!("metadata" => metadata) + end router.emit(@tag, time, record) } else time = (event.timestamp / 1000).floor begin record = @json_handler.load(event.message) + unless metadata.empty? + record.merge!("metadata" => metadata) + end router.emit(@tag, time, record) rescue JSON::ParserError, Yajl::ParseError => error # Catch parser errors log.error "Invalid JSON encountered while parsing event.message" router.emit_error_event(@tag, time, { message: event.message }, error) end