lib/fluent/plugin/in_cloudwatch_logs.rb in fluent-plugin-cloudwatch-logs-0.12.0 vs lib/fluent/plugin/in_cloudwatch_logs.rb in fluent-plugin-cloudwatch-logs-0.13.0

- old
+ new

@@ -20,10 +20,13 @@ config_param :aws_sts_endpoint_url, :string, default: nil config_param :region, :string, default: nil config_param :endpoint, :string, default: nil config_param :tag, :string config_param :log_group_name, :string + config_param :add_log_group_name, :bool, default: false + config_param :log_group_name_key, :string, default: 'log_group' + config_param :use_log_group_name_prefix, :bool, default: false config_param :log_stream_name, :string, default: nil config_param :use_log_stream_name_prefix, :bool, default: false config_param :state_file, :string, default: nil, deprecated: "Use <stroage> instead." config_param :fetch_interval, :time, default: 60 @@ -126,108 +129,126 @@ def shutdown @finished = true super end + # No private for testing + def state_key_for(log_stream_name, log_group_name = nil) + if log_group_name && log_stream_name + "#{@state_file}_#{log_group_name.gsub(File::SEPARATOR, '-')}_#{log_stream_name.gsub(File::SEPARATOR, '-')}" + elsif log_stream_name + "#{@state_file}_#{log_stream_name.gsub(File::SEPARATOR, '-')}" + else + @state_file + end + end + private def configure_parser(conf) if conf['format'] @parser = parser_create elsif parser_config = conf.elements('parse').first @parser = parser_create(conf: parser_config) end end - def state_key_for(log_stream_name) - if log_stream_name - "#{@state_file}_#{log_stream_name.gsub(File::SEPARATOR, '-')}" - else - @state_file - end - end - def migrate_state_file_to_storage(log_stream_name) @next_token_storage.put(:"#{state_key_for(log_stream_name)}", File.read(state_key_for(log_stream_name)).chomp) File.delete(state_key_for(log_stream_name)) end - def next_token(log_stream_name) + def next_token(log_stream_name, log_group_name = nil) if @next_token_storage.persistent && File.exist?(state_key_for(log_stream_name)) migrate_state_file_to_storage(log_stream_name) end - @next_token_storage.get(:"#{state_key_for(log_stream_name)}") + @next_token_storage.get(:"#{state_key_for(log_stream_name, log_group_name)}") end - def store_next_token(token, log_stream_name = nil) - @next_token_storage.put(:"#{state_key_for(log_stream_name)}", token) + def store_next_token(token, log_stream_name = nil, log_group_name = nil) + @next_token_storage.put(:"#{state_key_for(log_stream_name, log_group_name)}", token) end def run @next_fetch_time = Time.now until @finished if Time.now > @next_fetch_time @next_fetch_time += @fetch_interval - if @use_log_stream_name_prefix || @use_todays_log_stream - log_stream_name_prefix = @use_todays_log_stream ? get_todays_date : @log_stream_name - begin - log_streams = describe_log_streams(log_stream_name_prefix) - log_streams.concat(describe_log_streams(get_yesterdays_date)) if @use_todays_log_stream - log_streams.each do |log_stream| - log_stream_name = log_stream.log_stream_name - events = get_events(log_stream_name) - metadata = if @include_metadata - { - "log_stream_name" => log_stream_name, - "log_group_name" => @log_group_name - } - else - {} - end - events.each do |event| - emit(log_stream_name, event, metadata) + if @use_log_group_name_prefix + log_group_names = describe_log_groups(@log_group_name).map{|log_group| + log_group.log_group_name + } + else + log_group_names = [@log_group_name] + end + log_group_names.each do |log_group_name| + if @use_log_stream_name_prefix || @use_todays_log_stream + log_stream_name_prefix = @use_todays_log_stream ? get_todays_date : @log_stream_name + begin + log_streams = describe_log_streams(log_stream_name_prefix) + log_streams.concat(describe_log_streams(get_yesterdays_date)) if @use_todays_log_stream + log_streams.each do |log_stream| + log_stream_name = log_stream.log_stream_name + events = get_events(log_group_name, log_stream_name) + metadata = if @include_metadata + { + "log_stream_name" => log_stream_name, + "log_group_name" => @log_group_name + } + else + {} + end + events.each do |event| + emit(log_group_name, log_stream_name, event, metadata) + end end + rescue Aws::CloudWatchLogs::Errors::ResourceNotFoundException + log.warn "'#{@log_stream_name}' prefixed log stream(s) are not found" + next end - rescue Aws::CloudWatchLogs::Errors::ResourceNotFoundException - log.warn "'#{@log_stream_name}' prefixed log stream(s) are not found" - next + else + events = get_events(log_group_name, @log_stream_name) + metadata = if @include_metadata + { + "log_stream_name" => @log_stream_name, + "log_group_name" => @log_group_name + } + else + {} + end + events.each do |event| + emit(log_group_name, log_stream_name, event, metadata) + end end - else - events = get_events(@log_stream_name) - metadata = if @include_metadata - { - "log_stream_name" => @log_stream_name, - "log_group_name" => @log_group_name - } - else - {} - end - events.each do |event| - emit(log_stream_name, event, metadata) - end end - end sleep 1 + end end end - def emit(stream, event, metadata) + def emit(group, stream, event, metadata) if @parser @parser.parse(event.message) {|time,record| if @use_aws_timestamp time = (event.timestamp / 1000).floor end + if @add_log_group_name + record[@log_group_name_key] = group + end unless metadata.empty? record.merge!("metadata" => metadata) end router.emit(@tag, time, record) } else time = (event.timestamp / 1000).floor begin record = @json_handler.load(event.message) + if @add_log_group_name + record[@log_group_name_key] = group + end unless metadata.empty? record.merge!("metadata" => metadata) end router.emit(@tag, time, record) rescue JSON::ParserError, Yajl::ParseError => error # Catch parser errors @@ -235,23 +256,27 @@ router.emit_error_event(@tag, time, { message: event.message }, error) end end end - def get_events(log_stream_name) + def get_events(log_group_name, log_stream_name) throttling_handler('get_log_events') do request = { - log_group_name: @log_group_name, + log_group_name: log_group_name, log_stream_name: log_stream_name } request.merge!(start_time: @start_time) if @start_time request.merge!(end_time: @end_time) if @end_time - log_next_token = next_token(log_stream_name) + log_next_token = next_token(log_group_name, log_stream_name) request[:next_token] = log_next_token if !log_next_token.nil? && !log_next_token.empty? response = @logs.get_log_events(request) if valid_next_token(log_next_token, response.next_forward_token) - store_next_token(response.next_forward_token, log_stream_name) + if @use_log_group_name_prefix + store_next_token(response.next_forward_token, log_stream_name, log_group_name) + else + store_next_token(response.next_forward_token, log_stream_name) + end end response.events end end @@ -285,9 +310,26 @@ throttling_handler(method_name) { yield } else raise err end + end + + def describe_log_groups(log_group_name_prefix, log_groups = nil, next_token = nil) + request = { + log_group_name_prefix: log_group_name_prefix + } + request[:next_token] = next_token if next_token + response = @logs.describe_log_groups(request) + if log_groups + log_groups.concat(response.log_groups) + else + log_groups = response.log_groups + end + if response.next_token + log_groups = describe_log_groups(log_group_name_prefix, log_groups, response.next_token) + end + log_groups end def valid_next_token(prev_token, next_token) next_token && prev_token != next_token.chomp end