lib/fluent/plugin/in_cloudwatch_logs.rb in fluent-plugin-cloudwatch-logs-0.4.5 vs lib/fluent/plugin/in_cloudwatch_logs.rb in fluent-plugin-cloudwatch-logs-0.5.0.pre1

- old
+ new

@@ -1,21 +1,14 @@ -require 'fluent/input' -require 'fluent/parser' +require 'fluent/plugin/input' +require 'fluent/plugin/parser' -module Fluent - require 'fluent/mixin/config_placeholders' - +module Fluent::Plugin class CloudwatchLogsInput < Input - Plugin.register_input('cloudwatch_logs', self) + Fluent::Plugin.register_input('cloudwatch_logs', self) - include Fluent::Mixin::ConfigPlaceholders + helpers :parser, :thread, :compat_parameters - # Define `router` method of v0.12 to support v0.10.57 or earlier - unless method_defined?(:router) - define_method("router") { Engine } - end - config_param :aws_key_id, :string, :default => nil, :secret => true config_param :aws_sec_key, :string, :default => nil, :secret => true config_param :aws_use_sts, :bool, default: false config_param :aws_sts_role_arn, :string, default: nil config_param :aws_sts_session_name, :string, default: 'fluentd' @@ -26,26 +19,28 @@ config_param :use_log_stream_name_prefix, :bool, default: false config_param :state_file, :string config_param :fetch_interval, :time, default: 60 config_param :http_proxy, :string, default: nil + config_section :parse do + config_set_default :@type, 'none' + end + def initialize super require 'aws-sdk-cloudwatchlogs' end - def placeholders - [:percent] - end - def configure(conf) + compat_parameters_convert(conf, :parser) super configure_parser(conf) end def start + super options = {} options[:region] = @region if @region options[:http_proxy] = @http_proxy if @http_proxy if @aws_use_sts @@ -59,23 +54,22 @@ end @logs = Aws::CloudWatchLogs::Client.new(options) @finished = false - @thread = Thread.new(&method(:run)) + thread_create(:in_cloudwatch_logs_runner, &method(:run)) end def shutdown @finished = true - @thread.join + super end private def configure_parser(conf) if conf['format'] - @parser = Fluent::TextParser.new - @parser.configure(conf) + @parser = parser_create end end def state_file_for(log_stream_name) return "#{@state_file}_#{log_stream_name.gsub(File::SEPARATOR, '-')}" if log_stream_name @@ -120,11 +114,12 @@ end end def emit(stream, event) if @parser - record = @parser.parse(event.message) - router.emit(@tag, record[0], record[1]) + @parser.parse(event.message) {|time, record| + router.emit(@tag, time, record) + } else time = (event.timestamp / 1000).floor record = JSON.parse(event.message) router.emit(@tag, time, record) end