lib/fluent/plugin/out_grassland.rb in fluent-plugin-grassland-0.0.3 vs lib/fluent/plugin/out_grassland.rb in fluent-plugin-grassland-0.0.4
- old
+ new
@@ -1,25 +1,40 @@
module Fluent
class GrasslandOutput < Fluent::BufferedOutput
Fluent::Plugin.register_output('grassland', self)
- attr_accessor :stream_name, :access_key_id, :secret_access_key, :region
+ attr_accessor :random
+ attr_accessor :kinesis
+ attr_accessor :stream_name, :access_key_id, :secret_access_key, :region, :sessionToken, :partitionKeys
def initialize
super
require 'aws-sdk'
require 'base64'
require 'json'
require 'logger'
require 'net/http'
require 'uri'
+ @random = Random.new
end
- config_param :apiuri, :string, :default => 'https://grassland-api.elasticbeanstalk.com/credential'
- config_param :key, :string, :default => 'nil'
- config_param :debug, :bool, :default => false
+ config_param :apiuri, :string, :default => 'https://grassland.biz/credentials'
+ config_param :id, :string, :default => 'nil'
+ config_param :key, :string, :default => 'nil'
+ config_param :debug, :bool, :default => false
+ config_param :resetCredentialTimer, :integer, :default => 86400
+ # config_param :resetCredentialTimer, :integer, :default => 20
+ def set_interval(delay)
+ Thread.new do
+ loop do
+ sleep delay
+ yield # call passed block
+ end
+ end
+ end
+
def configure(conf)
super
[:key].each do |name|
unless self.instance_variable_get("@#{name}")
@@ -28,29 +43,43 @@
end
end
def start
super
- setCredential
- configure_aws
- AWS.kinesis.client.put_record({
- :stream_name => @stream_name,
- :data => "test",
- :partition_key => "#{rand(999)}"
- })
+ set_interval(@resetCredentialTimer){
+ resetAwsCredential
+ }
+ resetAwsCredential
end
def shutdown
super
end
+ def resetAwsCredential()
+ begin
+ setCredential
+ configure_aws
+ AWS.kinesis.client.put_record({
+ :stream_name => @stream_name,
+ :data => "test",
+ :partition_key => "#{random.rand(999)}"
+ })
+ puts "fluentd: reset credential"
+ rescue => e
+ puts [e.class, e].join(" : initialize error.")
+ end
+ end
+
def setCredential()
credential = get_json("#{@apiuri}?key=#{@key}")
@stream_name = credential['streamName']
@access_key_id = credential['accessKeyId']
@secret_access_key = credential['secretAccessKey']
@region = credential['region']
+ @sessionToken = credential['SessionToken']
+ @partitionKeys = credential['SessionToken']
end
def get_json(location, limit = 3)
raise ArgumentError, 'too many HTTP redirects' if limit == 0
uri = URI.parse(location)
@@ -78,19 +107,22 @@
end
end
def format(tag, time, record)
# print(record)
- ['cid', 'dt', 'uid', 'd'].each do |key|
+ ['dt', 'uid', 'd'].each do |key|
unless record.has_key?(key)
puts "input data error: '#{key}' is required"
return ""
end
end
unless record.has_key?('pt')
record['pt'] = time
end
+ unless record.has_key?('cid')
+ record['cid'] = @id
+ end
record['pk'] = record['cid'] + record['dt']
return "#{record.to_json},"
end
@@ -109,21 +141,23 @@
end
if bufList[":#{data['pk']}"].bytesize >= 30720 then
AWS.kinesis.client.put_record({
:stream_name => @stream_name,
:data => "["+bufList[":#{data['pk']}"].chop+"]",
- :partition_key => data['pk']
+ :partition_key => partitionKeys[random.rand(partitionKeys.length)]
+ # :partition_key => data['pk']
})
bufList.delete(":#{data['pk']}")
end
end
dataList.each do |data|
if bufList[":#{data['pk']}"] != nil then
AWS.kinesis.client.put_record({
:stream_name => @stream_name,
:data => "["+bufList[":#{data['pk']}"].chop+"]",
- :partition_key => data['pk']
+ :partition_key => partitionKeys[random.rand(partitionKeys.length)]
+ # :partition_key => data['pk']
})
bufList.delete(":#{data['pk']}")
end
end
rescue
@@ -135,10 +169,11 @@
def configure_aws
options = {
:access_key_id => @access_key_id,
:secret_access_key => @secret_access_key,
- :region => @region
+ :region => @region,
+ :session_token => @sessionToken
}
if @debug
options.update(
:logger => Logger.new($log.out),