lib/fluent/plugin/in_cloudwatch_logs.rb in fluent-plugin-cloudwatch-logs-0.4.5 vs lib/fluent/plugin/in_cloudwatch_logs.rb in fluent-plugin-cloudwatch-logs-0.5.0.pre1
- old
+ new
@@ -1,21 +1,14 @@
-require 'fluent/input'
-require 'fluent/parser'
+require 'fluent/plugin/input'
+require 'fluent/plugin/parser'
-module Fluent
- require 'fluent/mixin/config_placeholders'
-
+module Fluent::Plugin
class CloudwatchLogsInput < Input
- Plugin.register_input('cloudwatch_logs', self)
+ Fluent::Plugin.register_input('cloudwatch_logs', self)
- include Fluent::Mixin::ConfigPlaceholders
+ helpers :parser, :thread, :compat_parameters
- # Define `router` method of v0.12 to support v0.10.57 or earlier
- unless method_defined?(:router)
- define_method("router") { Engine }
- end
-
config_param :aws_key_id, :string, :default => nil, :secret => true
config_param :aws_sec_key, :string, :default => nil, :secret => true
config_param :aws_use_sts, :bool, default: false
config_param :aws_sts_role_arn, :string, default: nil
config_param :aws_sts_session_name, :string, default: 'fluentd'
@@ -26,26 +19,28 @@
config_param :use_log_stream_name_prefix, :bool, default: false
config_param :state_file, :string
config_param :fetch_interval, :time, default: 60
config_param :http_proxy, :string, default: nil
+ config_section :parse do
+ config_set_default :@type, 'none'
+ end
+
def initialize
super
require 'aws-sdk-cloudwatchlogs'
end
- def placeholders
- [:percent]
- end
-
def configure(conf)
+ compat_parameters_convert(conf, :parser)
super
configure_parser(conf)
end
def start
+ super
options = {}
options[:region] = @region if @region
options[:http_proxy] = @http_proxy if @http_proxy
if @aws_use_sts
@@ -59,23 +54,22 @@
end
@logs = Aws::CloudWatchLogs::Client.new(options)
@finished = false
- @thread = Thread.new(&method(:run))
+ thread_create(:in_cloudwatch_logs_runner, &method(:run))
end
def shutdown
@finished = true
- @thread.join
+ super
end
private
def configure_parser(conf)
if conf['format']
- @parser = Fluent::TextParser.new
- @parser.configure(conf)
+ @parser = parser_create
end
end
def state_file_for(log_stream_name)
return "#{@state_file}_#{log_stream_name.gsub(File::SEPARATOR, '-')}" if log_stream_name
@@ -120,11 +114,12 @@
end
end
def emit(stream, event)
if @parser
- record = @parser.parse(event.message)
- router.emit(@tag, record[0], record[1])
+ @parser.parse(event.message) {|time, record|
+ router.emit(@tag, time, record)
+ }
else
time = (event.timestamp / 1000).floor
record = JSON.parse(event.message)
router.emit(@tag, time, record)
end