lib/fluent/plugin/out_grassland.rb in fluent-plugin-grassland-0.0.2 vs lib/fluent/plugin/out_grassland.rb in fluent-plugin-grassland-0.0.3
- old
+ new
@@ -98,33 +98,37 @@
buf = chunk.read
dataList = JSON.parse("[#{buf.chop}]")
putBuf = ""
bufList = {}
- dataList.each do |data|
- if bufList[":#{data['pk']}"] == nil then
- bufList[":#{data['pk']}"] = "#{data.to_json},"
- else
- bufList[":#{data['pk']}"] += "#{data.to_json},"
+ begin
+ dataList.each do |data|
+ if bufList[":#{data['pk']}"] == nil then
+ bufList[":#{data['pk']}"] = "#{data.to_json},"
+ else
+ bufList[":#{data['pk']}"] += "#{data.to_json},"
+ end
+ if bufList[":#{data['pk']}"].bytesize >= 30720 then
+ AWS.kinesis.client.put_record({
+ :stream_name => @stream_name,
+ :data => "["+bufList[":#{data['pk']}"].chop+"]",
+ :partition_key => data['pk']
+ })
+ bufList.delete(":#{data['pk']}")
+ end
end
- if bufList[":#{data['pk']}"].bytesize >= 30720 then
- AWS.kinesis.client.put_record({
- :stream_name => @stream_name,
- :data => "["+bufList[":#{data['pk']}"].chop+"]",
- :partition_key => data['pk']
- })
- bufList.delete(":#{data['pk']}")
+ dataList.each do |data|
+ if bufList[":#{data['pk']}"] != nil then
+ AWS.kinesis.client.put_record({
+ :stream_name => @stream_name,
+ :data => "["+bufList[":#{data['pk']}"].chop+"]",
+ :partition_key => data['pk']
+ })
+ bufList.delete(":#{data['pk']}")
+ end
end
- end
- dataList.each do |data|
- if bufList[":#{data['pk']}"] != nil then
- AWS.kinesis.client.put_record({
- :stream_name => @stream_name,
- :data => "["+bufList[":#{data['pk']}"].chop+"]",
- :partition_key => data['pk']
- })
- bufList.delete(":#{data['pk']}")
- end
+ rescue
+ puts "error: put_record to grassland. maybe too many requests. few data dropped."
end
end
private