lib/fluent/plugin/in_cloudwatch_logs.rb in fluent-plugin-cloudwatch-logs-0.12.0 vs lib/fluent/plugin/in_cloudwatch_logs.rb in fluent-plugin-cloudwatch-logs-0.13.0
- old
+ new
@@ -20,10 +20,13 @@
config_param :aws_sts_endpoint_url, :string, default: nil
config_param :region, :string, default: nil
config_param :endpoint, :string, default: nil
config_param :tag, :string
config_param :log_group_name, :string
+ config_param :add_log_group_name, :bool, default: false
+ config_param :log_group_name_key, :string, default: 'log_group'
+ config_param :use_log_group_name_prefix, :bool, default: false
config_param :log_stream_name, :string, default: nil
config_param :use_log_stream_name_prefix, :bool, default: false
config_param :state_file, :string, default: nil,
deprecated: "Use <stroage> instead."
config_param :fetch_interval, :time, default: 60
@@ -126,108 +129,126 @@
def shutdown
@finished = true
super
end
+ # No private for testing
+ def state_key_for(log_stream_name, log_group_name = nil)
+ if log_group_name && log_stream_name
+ "#{@state_file}_#{log_group_name.gsub(File::SEPARATOR, '-')}_#{log_stream_name.gsub(File::SEPARATOR, '-')}"
+ elsif log_stream_name
+ "#{@state_file}_#{log_stream_name.gsub(File::SEPARATOR, '-')}"
+ else
+ @state_file
+ end
+ end
+
private
def configure_parser(conf)
if conf['format']
@parser = parser_create
elsif parser_config = conf.elements('parse').first
@parser = parser_create(conf: parser_config)
end
end
- def state_key_for(log_stream_name)
- if log_stream_name
- "#{@state_file}_#{log_stream_name.gsub(File::SEPARATOR, '-')}"
- else
- @state_file
- end
- end
-
def migrate_state_file_to_storage(log_stream_name)
@next_token_storage.put(:"#{state_key_for(log_stream_name)}", File.read(state_key_for(log_stream_name)).chomp)
File.delete(state_key_for(log_stream_name))
end
- def next_token(log_stream_name)
+ def next_token(log_stream_name, log_group_name = nil)
if @next_token_storage.persistent && File.exist?(state_key_for(log_stream_name))
migrate_state_file_to_storage(log_stream_name)
end
- @next_token_storage.get(:"#{state_key_for(log_stream_name)}")
+ @next_token_storage.get(:"#{state_key_for(log_stream_name, log_group_name)}")
end
- def store_next_token(token, log_stream_name = nil)
- @next_token_storage.put(:"#{state_key_for(log_stream_name)}", token)
+ def store_next_token(token, log_stream_name = nil, log_group_name = nil)
+ @next_token_storage.put(:"#{state_key_for(log_stream_name, log_group_name)}", token)
end
def run
@next_fetch_time = Time.now
until @finished
if Time.now > @next_fetch_time
@next_fetch_time += @fetch_interval
- if @use_log_stream_name_prefix || @use_todays_log_stream
- log_stream_name_prefix = @use_todays_log_stream ? get_todays_date : @log_stream_name
- begin
- log_streams = describe_log_streams(log_stream_name_prefix)
- log_streams.concat(describe_log_streams(get_yesterdays_date)) if @use_todays_log_stream
- log_streams.each do |log_stream|
- log_stream_name = log_stream.log_stream_name
- events = get_events(log_stream_name)
- metadata = if @include_metadata
- {
- "log_stream_name" => log_stream_name,
- "log_group_name" => @log_group_name
- }
- else
- {}
- end
- events.each do |event|
- emit(log_stream_name, event, metadata)
+ if @use_log_group_name_prefix
+ log_group_names = describe_log_groups(@log_group_name).map{|log_group|
+ log_group.log_group_name
+ }
+ else
+ log_group_names = [@log_group_name]
+ end
+ log_group_names.each do |log_group_name|
+ if @use_log_stream_name_prefix || @use_todays_log_stream
+ log_stream_name_prefix = @use_todays_log_stream ? get_todays_date : @log_stream_name
+ begin
+ log_streams = describe_log_streams(log_stream_name_prefix)
+ log_streams.concat(describe_log_streams(get_yesterdays_date)) if @use_todays_log_stream
+ log_streams.each do |log_stream|
+ log_stream_name = log_stream.log_stream_name
+ events = get_events(log_group_name, log_stream_name)
+ metadata = if @include_metadata
+ {
+ "log_stream_name" => log_stream_name,
+ "log_group_name" => @log_group_name
+ }
+ else
+ {}
+ end
+ events.each do |event|
+ emit(log_group_name, log_stream_name, event, metadata)
+ end
end
+ rescue Aws::CloudWatchLogs::Errors::ResourceNotFoundException
+ log.warn "'#{@log_stream_name}' prefixed log stream(s) are not found"
+ next
end
- rescue Aws::CloudWatchLogs::Errors::ResourceNotFoundException
- log.warn "'#{@log_stream_name}' prefixed log stream(s) are not found"
- next
+ else
+ events = get_events(log_group_name, @log_stream_name)
+ metadata = if @include_metadata
+ {
+ "log_stream_name" => @log_stream_name,
+ "log_group_name" => @log_group_name
+ }
+ else
+ {}
+ end
+ events.each do |event|
+ emit(log_group_name, log_stream_name, event, metadata)
+ end
end
- else
- events = get_events(@log_stream_name)
- metadata = if @include_metadata
- {
- "log_stream_name" => @log_stream_name,
- "log_group_name" => @log_group_name
- }
- else
- {}
- end
- events.each do |event|
- emit(log_stream_name, event, metadata)
- end
end
- end
sleep 1
+ end
end
end
- def emit(stream, event, metadata)
+ def emit(group, stream, event, metadata)
if @parser
@parser.parse(event.message) {|time,record|
if @use_aws_timestamp
time = (event.timestamp / 1000).floor
end
+ if @add_log_group_name
+ record[@log_group_name_key] = group
+ end
unless metadata.empty?
record.merge!("metadata" => metadata)
end
router.emit(@tag, time, record)
}
else
time = (event.timestamp / 1000).floor
begin
record = @json_handler.load(event.message)
+ if @add_log_group_name
+ record[@log_group_name_key] = group
+ end
unless metadata.empty?
record.merge!("metadata" => metadata)
end
router.emit(@tag, time, record)
rescue JSON::ParserError, Yajl::ParseError => error # Catch parser errors
@@ -235,23 +256,27 @@
router.emit_error_event(@tag, time, { message: event.message }, error)
end
end
end
- def get_events(log_stream_name)
+ def get_events(log_group_name, log_stream_name)
throttling_handler('get_log_events') do
request = {
- log_group_name: @log_group_name,
+ log_group_name: log_group_name,
log_stream_name: log_stream_name
}
request.merge!(start_time: @start_time) if @start_time
request.merge!(end_time: @end_time) if @end_time
- log_next_token = next_token(log_stream_name)
+ log_next_token = next_token(log_group_name, log_stream_name)
request[:next_token] = log_next_token if !log_next_token.nil? && !log_next_token.empty?
response = @logs.get_log_events(request)
if valid_next_token(log_next_token, response.next_forward_token)
- store_next_token(response.next_forward_token, log_stream_name)
+ if @use_log_group_name_prefix
+ store_next_token(response.next_forward_token, log_stream_name, log_group_name)
+ else
+ store_next_token(response.next_forward_token, log_stream_name)
+ end
end
response.events
end
end
@@ -285,9 +310,26 @@
throttling_handler(method_name) { yield }
else
raise err
end
+ end
+
+ def describe_log_groups(log_group_name_prefix, log_groups = nil, next_token = nil)
+ request = {
+ log_group_name_prefix: log_group_name_prefix
+ }
+ request[:next_token] = next_token if next_token
+ response = @logs.describe_log_groups(request)
+ if log_groups
+ log_groups.concat(response.log_groups)
+ else
+ log_groups = response.log_groups
+ end
+ if response.next_token
+ log_groups = describe_log_groups(log_group_name_prefix, log_groups, response.next_token)
+ end
+ log_groups
end
def valid_next_token(prev_token, next_token)
next_token && prev_token != next_token.chomp
end