lib/fluent/plugin/in_cloudwatch_logs.rb in fluent-plugin-cloudwatch-logs-0.5.0.pre1 vs lib/fluent/plugin/in_cloudwatch_logs.rb in fluent-plugin-cloudwatch-logs-0.5.0

- old
+ new

@@ -1,7 +1,8 @@ require 'fluent/plugin/input' require 'fluent/plugin/parser' +require 'yajl' module Fluent::Plugin class CloudwatchLogsInput < Input Fluent::Plugin.register_input('cloudwatch_logs', self) @@ -11,17 +12,19 @@ config_param :aws_sec_key, :string, :default => nil, :secret => true config_param :aws_use_sts, :bool, default: false config_param :aws_sts_role_arn, :string, default: nil config_param :aws_sts_session_name, :string, default: 'fluentd' config_param :region, :string, :default => nil + config_param :endpoint, :string, :default => nil config_param :tag, :string config_param :log_group_name, :string config_param :log_stream_name, :string config_param :use_log_stream_name_prefix, :bool, default: false config_param :state_file, :string config_param :fetch_interval, :time, default: 60 config_param :http_proxy, :string, default: nil + config_param :json_handler, :enum, list: [:yajl, :json], :default => :json config_section :parse do config_set_default :@type, 'none' end @@ -39,10 +42,11 @@ def start super options = {} options[:region] = @region if @region + options[:endpoint] = @endpoint if @endpoint options[:http_proxy] = @http_proxy if @http_proxy if @aws_use_sts Aws.config[:region] = options[:region] options[:credentials] = Aws::AssumeRoleCredentials.new( @@ -55,10 +59,17 @@ @logs = Aws::CloudWatchLogs::Client.new(options) @finished = false thread_create(:in_cloudwatch_logs_runner, &method(:run)) + + @json_handler = case @json_handler + when :yajl + Yajl + when :json + JSON + end end def shutdown @finished = true super @@ -119,10 +130,10 @@ @parser.parse(event.message) {|time, record| router.emit(@tag, time, record) } else time = (event.timestamp / 1000).floor - record = JSON.parse(event.message) + record = @json_handler.load(event.message) router.emit(@tag, time, record) end end def get_events(log_stream_name)