lib/fluent/plugin/in_cloudwatch_logs.rb in fluent-plugin-cloudwatch-logs-0.5.0.pre1 vs lib/fluent/plugin/in_cloudwatch_logs.rb in fluent-plugin-cloudwatch-logs-0.5.0
- old
+ new
@@ -1,7 +1,8 @@
require 'fluent/plugin/input'
require 'fluent/plugin/parser'
+require 'yajl'
module Fluent::Plugin
class CloudwatchLogsInput < Input
Fluent::Plugin.register_input('cloudwatch_logs', self)
@@ -11,17 +12,19 @@
config_param :aws_sec_key, :string, :default => nil, :secret => true
config_param :aws_use_sts, :bool, default: false
config_param :aws_sts_role_arn, :string, default: nil
config_param :aws_sts_session_name, :string, default: 'fluentd'
config_param :region, :string, :default => nil
+ config_param :endpoint, :string, :default => nil
config_param :tag, :string
config_param :log_group_name, :string
config_param :log_stream_name, :string
config_param :use_log_stream_name_prefix, :bool, default: false
config_param :state_file, :string
config_param :fetch_interval, :time, default: 60
config_param :http_proxy, :string, default: nil
+ config_param :json_handler, :enum, list: [:yajl, :json], :default => :json
config_section :parse do
config_set_default :@type, 'none'
end
@@ -39,10 +42,11 @@
def start
super
options = {}
options[:region] = @region if @region
+ options[:endpoint] = @endpoint if @endpoint
options[:http_proxy] = @http_proxy if @http_proxy
if @aws_use_sts
Aws.config[:region] = options[:region]
options[:credentials] = Aws::AssumeRoleCredentials.new(
@@ -55,10 +59,17 @@
@logs = Aws::CloudWatchLogs::Client.new(options)
@finished = false
thread_create(:in_cloudwatch_logs_runner, &method(:run))
+
+ @json_handler = case @json_handler
+ when :yajl
+ Yajl
+ when :json
+ JSON
+ end
end
def shutdown
@finished = true
super
@@ -119,10 +130,10 @@
@parser.parse(event.message) {|time, record|
router.emit(@tag, time, record)
}
else
time = (event.timestamp / 1000).floor
- record = JSON.parse(event.message)
+ record = @json_handler.load(event.message)
router.emit(@tag, time, record)
end
end
def get_events(log_stream_name)