lib/fluent/plugin/out_grassland.rb in fluent-plugin-grassland-0.0.2 vs lib/fluent/plugin/out_grassland.rb in fluent-plugin-grassland-0.0.3

- old
+ new

@@ -98,33 +98,37 @@ buf = chunk.read dataList = JSON.parse("[#{buf.chop}]") putBuf = "" bufList = {} - dataList.each do |data| - if bufList[":#{data['pk']}"] == nil then - bufList[":#{data['pk']}"] = "#{data.to_json}," - else - bufList[":#{data['pk']}"] += "#{data.to_json}," + begin + dataList.each do |data| + if bufList[":#{data['pk']}"] == nil then + bufList[":#{data['pk']}"] = "#{data.to_json}," + else + bufList[":#{data['pk']}"] += "#{data.to_json}," + end + if bufList[":#{data['pk']}"].bytesize >= 30720 then + AWS.kinesis.client.put_record({ + :stream_name => @stream_name, + :data => "["+bufList[":#{data['pk']}"].chop+"]", + :partition_key => data['pk'] + }) + bufList.delete(":#{data['pk']}") + end end - if bufList[":#{data['pk']}"].bytesize >= 30720 then - AWS.kinesis.client.put_record({ - :stream_name => @stream_name, - :data => "["+bufList[":#{data['pk']}"].chop+"]", - :partition_key => data['pk'] - }) - bufList.delete(":#{data['pk']}") + dataList.each do |data| + if bufList[":#{data['pk']}"] != nil then + AWS.kinesis.client.put_record({ + :stream_name => @stream_name, + :data => "["+bufList[":#{data['pk']}"].chop+"]", + :partition_key => data['pk'] + }) + bufList.delete(":#{data['pk']}") + end end - end - dataList.each do |data| - if bufList[":#{data['pk']}"] != nil then - AWS.kinesis.client.put_record({ - :stream_name => @stream_name, - :data => "["+bufList[":#{data['pk']}"].chop+"]", - :partition_key => data['pk'] - }) - bufList.delete(":#{data['pk']}") - end + rescue + puts "error: put_record to grassland. maybe too many requests. few data dropped." end end private