lib/fluent/plugin/out_grassland.rb in fluent-plugin-grassland-0.2.0 vs lib/fluent/plugin/out_grassland.rb in fluent-plugin-grassland-0.2.1

- old
+ new

@@ -1,9 +1,12 @@ -module Fluent - class GrasslandOutput < Fluent::BufferedOutput - Fluent::Plugin.register_output('grassland', self) +require 'syslog/logger' +require 'fluent/plugin/output' +module Fluent::Plugin + class GrasslandOutput < Fluent::Plugin::Output + Fluent::Plugin.register_output("grassland", self) + attr_accessor :random attr_accessor :kinesis attr_accessor :id, :stream_name, :access_key_id, :secret_access_key, :region, :sessionToken, :partitionKeys def initialize @@ -14,10 +17,14 @@ require 'json' require 'logger' require 'net/http' require 'uri' @random = Random.new + + log = Syslog::Logger.new 'grasslandplugin' + log.info 'grassland initialize' + # puts "grassland initialize" end config_param :apiuri, :string, :default => 'https://grassland.biz/credentials' # config_param :id, :string, :default => 'nil' config_param :key, :string, :default => 'nil' @@ -58,18 +65,18 @@ def resetAwsCredential() begin setCredential configure_aws - @kinesis.client.put_record({ + @kinesis.put_record({ :stream_name => @stream_name, :data => "test", :partition_key => "#{random.rand(999)}" }) - puts "fluentd: reset credential" + log.info "grassland: reset credential" rescue => e - puts [e.class, e].join(" : initialize error.") + log.info [e.class, e].join(" : initialize error.") end end def setCredential() credential = get_json("#{@apiuri}?key=#{@key}") @@ -98,24 +105,24 @@ when Net::HTTPRedirection location = response['location'] warn "redirected to #{location}" get_json(location, limit - 1) else - puts [uri.to_s, response.value].join(" : ") + log.info [uri.to_s, response.value].join(" : ") # handle error end rescue => e - puts [uri.to_s, e.class, e].join(" : ") + log.info [uri.to_s, e.class, e].join(" : ") # handle error end end def format(tag, time, record) # print(record) ['dt', 'd'].each do |key| unless record.has_key?(key) - puts "input data error: '#{key}' is required" + log.info "input data error: '#{key}' is required" return "" end end unless record.has_key?('pt') record['pt'] = time @@ -137,38 +144,40 @@ putBuf = "" bufList = {} begin dataList.each do |data| + # debug log + # log.info data.to_json if bufList[":#{data['pk']}"] == nil then bufList[":#{data['pk']}"] = "#{data.to_json}," else bufList[":#{data['pk']}"] += "#{data.to_json}," end if bufList[":#{data['pk']}"].bytesize >= 30720 then - @kinesis.client.put_record({ + @kinesis.put_record({ :stream_name => @stream_name, :data => "["+bufList[":#{data['pk']}"].chop+"]", :partition_key => partitionKeys[random.rand(partitionKeys.length)] # :partition_key => data['pk'] }) bufList.delete(":#{data['pk']}") end end dataList.each do |data| if bufList[":#{data['pk']}"] != nil then - @kinesis.client.put_record({ + @kinesis.put_record({ :stream_name => @stream_name, :data => "["+bufList[":#{data['pk']}"].chop+"]", :partition_key => partitionKeys[random.rand(partitionKeys.length)] # :partition_key => data['pk'] }) bufList.delete(":#{data['pk']}") end end rescue - puts "error: put_record to grassland. maybe too many requests. few data dropped." + log.info "error: put_record to grassland. maybe too many requests. few data dropped." end end private @@ -186,10 +195,9 @@ :log_level => :debug, #http_wire_trace => true ) end - @kinesis = AWS::Kinesis::Client.new(options) - # AWS.config(options) + @kinesis = Aws::Kinesis::Client.new(options) end end -end + end