lib/fluent/plugin/out_grassland.rb in fluent-plugin-grassland-0.0.3 vs lib/fluent/plugin/out_grassland.rb in fluent-plugin-grassland-0.0.4

- old
+ new

@@ -1,25 +1,40 @@ module Fluent class GrasslandOutput < Fluent::BufferedOutput Fluent::Plugin.register_output('grassland', self) - attr_accessor :stream_name, :access_key_id, :secret_access_key, :region + attr_accessor :random + attr_accessor :kinesis + attr_accessor :stream_name, :access_key_id, :secret_access_key, :region, :sessionToken, :partitionKeys def initialize super require 'aws-sdk' require 'base64' require 'json' require 'logger' require 'net/http' require 'uri' + @random = Random.new end - config_param :apiuri, :string, :default => 'https://grassland-api.elasticbeanstalk.com/credential' - config_param :key, :string, :default => 'nil' - config_param :debug, :bool, :default => false + config_param :apiuri, :string, :default => 'https://grassland.biz/credentials' + config_param :id, :string, :default => 'nil' + config_param :key, :string, :default => 'nil' + config_param :debug, :bool, :default => false + config_param :resetCredentialTimer, :integer, :default => 86400 + # config_param :resetCredentialTimer, :integer, :default => 20 + def set_interval(delay) + Thread.new do + loop do + sleep delay + yield # call passed block + end + end + end + def configure(conf) super [:key].each do |name| unless self.instance_variable_get("@#{name}") @@ -28,29 +43,43 @@ end end def start super - setCredential - configure_aws - AWS.kinesis.client.put_record({ - :stream_name => @stream_name, - :data => "test", - :partition_key => "#{rand(999)}" - }) + set_interval(@resetCredentialTimer){ + resetAwsCredential + } + resetAwsCredential end def shutdown super end + def resetAwsCredential() + begin + setCredential + configure_aws + AWS.kinesis.client.put_record({ + :stream_name => @stream_name, + :data => "test", + :partition_key => "#{random.rand(999)}" + }) + puts "fluentd: reset credential" + rescue => e + puts [e.class, e].join(" : initialize error.") + end + end + def setCredential() credential = get_json("#{@apiuri}?key=#{@key}") @stream_name = credential['streamName'] @access_key_id = credential['accessKeyId'] @secret_access_key = credential['secretAccessKey'] @region = credential['region'] + @sessionToken = credential['SessionToken'] + @partitionKeys = credential['SessionToken'] end def get_json(location, limit = 3) raise ArgumentError, 'too many HTTP redirects' if limit == 0 uri = URI.parse(location) @@ -78,19 +107,22 @@ end end def format(tag, time, record) # print(record) - ['cid', 'dt', 'uid', 'd'].each do |key| + ['dt', 'uid', 'd'].each do |key| unless record.has_key?(key) puts "input data error: '#{key}' is required" return "" end end unless record.has_key?('pt') record['pt'] = time end + unless record.has_key?('cid') + record['cid'] = @id + end record['pk'] = record['cid'] + record['dt'] return "#{record.to_json}," end @@ -109,21 +141,23 @@ end if bufList[":#{data['pk']}"].bytesize >= 30720 then AWS.kinesis.client.put_record({ :stream_name => @stream_name, :data => "["+bufList[":#{data['pk']}"].chop+"]", - :partition_key => data['pk'] + :partition_key => partitionKeys[random.rand(partitionKeys.length)] + # :partition_key => data['pk'] }) bufList.delete(":#{data['pk']}") end end dataList.each do |data| if bufList[":#{data['pk']}"] != nil then AWS.kinesis.client.put_record({ :stream_name => @stream_name, :data => "["+bufList[":#{data['pk']}"].chop+"]", - :partition_key => data['pk'] + :partition_key => partitionKeys[random.rand(partitionKeys.length)] + # :partition_key => data['pk'] }) bufList.delete(":#{data['pk']}") end end rescue @@ -135,10 +169,11 @@ def configure_aws options = { :access_key_id => @access_key_id, :secret_access_key => @secret_access_key, - :region => @region + :region => @region, + :session_token => @sessionToken } if @debug options.update( :logger => Logger.new($log.out),