lib/fluent/plugin/in_cloudwatch_logs.rb in fluent-plugin-cloudwatch-logs-0.4.0 vs lib/fluent/plugin/in_cloudwatch_logs.rb in fluent-plugin-cloudwatch-logs-0.4.1
- old
+ new
@@ -1,6 +1,7 @@
require 'fluent/input'
+require 'fluent/parser'
module Fluent
require 'fluent/mixin/config_placeholders'
class CloudwatchLogsInput < Input
@@ -69,24 +70,27 @@
end
private
def configure_parser(conf)
if conf['format']
- @parser = TextParser.new
+ @parser = Fluent::TextParser.new
@parser.configure(conf)
end
end
- def next_token
- return nil unless File.exist?(@state_file)
- File.read(@state_file).chomp
+ def state_file_for(log_stream_name)
+ return "#{@state_file}_#{log_stream_name.gsub(File::SEPARATOR, '-')}" if log_stream_name
+ return @state_file
end
+ def next_token(log_stream_name)
+ return nil unless File.exist?(state_file_for(log_stream_name))
+ File.read(state_file_for(log_stream_name)).chomp
+ end
+
def store_next_token(token, log_stream_name = nil)
- state_file = @state_file
- state_file = "#{@state_file}_#{log_stream_name}" if log_stream_name
- open(state_file, 'w') do |f|
+ open(state_file_for(log_stream_name), 'w') do |f|
f.write token
end
end
def run
@@ -100,25 +104,25 @@
log_streams = describe_log_streams
log_streams.each do |log_stram|
log_stream_name = log_stram.log_stream_name
events = get_events(log_stream_name)
events.each do |event|
- emit(event)
+ emit(log_stream_name, event)
end
end
else
events = get_events(@log_stream_name)
events.each do |event|
- emit(event)
+ emit(log_stream_name, event)
end
end
end
sleep 1
end
end
- def emit(event)
+ def emit(stream, event)
if @parser
record = @parser.parse(event.message)
router.emit(@tag, record[0], record[1])
else
time = (event.timestamp / 1000).floor
@@ -130,10 +134,10 @@
def get_events(log_stream_name)
request = {
log_group_name: @log_group_name,
log_stream_name: log_stream_name
}
- request[:next_token] = next_token if next_token
+ request[:next_token] = next_token(log_stream_name) if next_token(log_stream_name)
response = @logs.get_log_events(request)
store_next_token(response.next_forward_token, log_stream_name)
response.events
end