lib/fluent/plugin/out_grassland.rb in fluent-plugin-grassland-0.2.0 vs lib/fluent/plugin/out_grassland.rb in fluent-plugin-grassland-0.2.1
- old
+ new
@@ -1,9 +1,12 @@
-module Fluent
- class GrasslandOutput < Fluent::BufferedOutput
- Fluent::Plugin.register_output('grassland', self)
+require 'syslog/logger'
+require 'fluent/plugin/output'
+module Fluent::Plugin
+ class GrasslandOutput < Fluent::Plugin::Output
+ Fluent::Plugin.register_output("grassland", self)
+
attr_accessor :random
attr_accessor :kinesis
attr_accessor :id, :stream_name, :access_key_id, :secret_access_key, :region, :sessionToken, :partitionKeys
def initialize
@@ -14,10 +17,14 @@
require 'json'
require 'logger'
require 'net/http'
require 'uri'
@random = Random.new
+
+ log = Syslog::Logger.new 'grasslandplugin'
+ log.info 'grassland initialize'
+ # puts "grassland initialize"
end
config_param :apiuri, :string, :default => 'https://grassland.biz/credentials'
# config_param :id, :string, :default => 'nil'
config_param :key, :string, :default => 'nil'
@@ -58,18 +65,18 @@
def resetAwsCredential()
begin
setCredential
configure_aws
- @kinesis.client.put_record({
+ @kinesis.put_record({
:stream_name => @stream_name,
:data => "test",
:partition_key => "#{random.rand(999)}"
})
- puts "fluentd: reset credential"
+ log.info "grassland: reset credential"
rescue => e
- puts [e.class, e].join(" : initialize error.")
+ log.info [e.class, e].join(" : initialize error.")
end
end
def setCredential()
credential = get_json("#{@apiuri}?key=#{@key}")
@@ -98,24 +105,24 @@
when Net::HTTPRedirection
location = response['location']
warn "redirected to #{location}"
get_json(location, limit - 1)
else
- puts [uri.to_s, response.value].join(" : ")
+ log.info [uri.to_s, response.value].join(" : ")
# handle error
end
rescue => e
- puts [uri.to_s, e.class, e].join(" : ")
+ log.info [uri.to_s, e.class, e].join(" : ")
# handle error
end
end
def format(tag, time, record)
# print(record)
['dt', 'd'].each do |key|
unless record.has_key?(key)
- puts "input data error: '#{key}' is required"
+ log.info "input data error: '#{key}' is required"
return ""
end
end
unless record.has_key?('pt')
record['pt'] = time
@@ -137,38 +144,40 @@
putBuf = ""
bufList = {}
begin
dataList.each do |data|
+ # debug log
+ # log.info data.to_json
if bufList[":#{data['pk']}"] == nil then
bufList[":#{data['pk']}"] = "#{data.to_json},"
else
bufList[":#{data['pk']}"] += "#{data.to_json},"
end
if bufList[":#{data['pk']}"].bytesize >= 30720 then
- @kinesis.client.put_record({
+ @kinesis.put_record({
:stream_name => @stream_name,
:data => "["+bufList[":#{data['pk']}"].chop+"]",
:partition_key => partitionKeys[random.rand(partitionKeys.length)]
# :partition_key => data['pk']
})
bufList.delete(":#{data['pk']}")
end
end
dataList.each do |data|
if bufList[":#{data['pk']}"] != nil then
- @kinesis.client.put_record({
+ @kinesis.put_record({
:stream_name => @stream_name,
:data => "["+bufList[":#{data['pk']}"].chop+"]",
:partition_key => partitionKeys[random.rand(partitionKeys.length)]
# :partition_key => data['pk']
})
bufList.delete(":#{data['pk']}")
end
end
rescue
- puts "error: put_record to grassland. maybe too many requests. few data dropped."
+ log.info "error: put_record to grassland. maybe too many requests. few data dropped."
end
end
private
@@ -186,10 +195,9 @@
:log_level => :debug,
#http_wire_trace => true
)
end
- @kinesis = AWS::Kinesis::Client.new(options)
- # AWS.config(options)
+ @kinesis = Aws::Kinesis::Client.new(options)
end
end
-end
+ end