lib/fluent/plugin/in_cloudwatch_logs.rb in fluent-plugin-cloudwatch-logs-0.4.0 vs lib/fluent/plugin/in_cloudwatch_logs.rb in fluent-plugin-cloudwatch-logs-0.4.1

- old
+ new

@@ -1,6 +1,7 @@ require 'fluent/input' +require 'fluent/parser' module Fluent require 'fluent/mixin/config_placeholders' class CloudwatchLogsInput < Input @@ -69,24 +70,27 @@ end private def configure_parser(conf) if conf['format'] - @parser = TextParser.new + @parser = Fluent::TextParser.new @parser.configure(conf) end end - def next_token - return nil unless File.exist?(@state_file) - File.read(@state_file).chomp + def state_file_for(log_stream_name) + return "#{@state_file}_#{log_stream_name.gsub(File::SEPARATOR, '-')}" if log_stream_name + return @state_file end + def next_token(log_stream_name) + return nil unless File.exist?(state_file_for(log_stream_name)) + File.read(state_file_for(log_stream_name)).chomp + end + def store_next_token(token, log_stream_name = nil) - state_file = @state_file - state_file = "#{@state_file}_#{log_stream_name}" if log_stream_name - open(state_file, 'w') do |f| + open(state_file_for(log_stream_name), 'w') do |f| f.write token end end def run @@ -100,25 +104,25 @@ log_streams = describe_log_streams log_streams.each do |log_stram| log_stream_name = log_stram.log_stream_name events = get_events(log_stream_name) events.each do |event| - emit(event) + emit(log_stream_name, event) end end else events = get_events(@log_stream_name) events.each do |event| - emit(event) + emit(log_stream_name, event) end end end sleep 1 end end - def emit(event) + def emit(stream, event) if @parser record = @parser.parse(event.message) router.emit(@tag, record[0], record[1]) else time = (event.timestamp / 1000).floor @@ -130,10 +134,10 @@ def get_events(log_stream_name) request = { log_group_name: @log_group_name, log_stream_name: log_stream_name } - request[:next_token] = next_token if next_token + request[:next_token] = next_token(log_stream_name) if next_token(log_stream_name) response = @logs.get_log_events(request) store_next_token(response.next_forward_token, log_stream_name) response.events end