src/main/java/org/embulk/output/kintone/KintonePageOutput.java in embulk-output-kintone-0.3.6 vs src/main/java/org/embulk/output/kintone/KintonePageOutput.java in embulk-output-kintone-0.4.0

- old
+ new

@@ -10,23 +10,27 @@ import java.math.BigDecimal; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.embulk.config.ConfigException; import org.embulk.config.TaskReport; import org.embulk.spi.Column; import org.embulk.spi.Exec; import org.embulk.spi.Page; import org.embulk.spi.PageReader; import org.embulk.spi.Schema; import org.embulk.spi.TransactionalPageOutput; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class KintonePageOutput implements TransactionalPageOutput { public static final int UPSERT_BATCH_SIZE = 10000; public static final int CHUNK_SIZE = 100; + private static final Logger LOGGER = LoggerFactory.getLogger(KintonePageOutput.class); private final PageReader pageReader; private final PluginTask task; private KintoneClient client; public KintonePageOutput(PluginTask task, Schema schema) { @@ -126,10 +130,11 @@ records.add(record); if (records.size() == CHUNK_SIZE) { client.record().addRecords(task.getAppId(), records); records.clear(); + sleep(); } } if (records.size() > 0) { client.record().addRecords(task.getAppId(), records); } @@ -169,10 +174,11 @@ record.removeField(updateKey.getField()); updateRecords.add(new RecordForUpdate(updateKey, record)); if (updateRecords.size() == CHUNK_SIZE) { client.record().updateRecords(task.getAppId(), updateRecords); updateRecords.clear(); + sleep(); } } if (updateRecords.size() > 0) { client.record().updateRecords(task.getAppId(), updateRecords); } @@ -221,11 +227,12 @@ throw new RuntimeException("kintone throw exception", e); } }); } - private void upsert(ArrayList<Record> records, ArrayList<UpdateKey> updateKeys) { + private void upsert(ArrayList<Record> records, ArrayList<UpdateKey> updateKeys) + throws InterruptedException { if (records.size() != updateKeys.size()) { throw new RuntimeException("records.size() != updateKeys.size()"); } FieldType updateKeyFieldType = client.app().getFormFields(task.getAppId()).get(updateKeys.get(0).getField()).getType(); @@ -267,13 +274,15 @@ } if (insertRecords.size() == CHUNK_SIZE) { client.record().addRecords(task.getAppId(), insertRecords); insertRecords.clear(); + sleep(); } else if (updateRecords.size() == CHUNK_SIZE) { client.record().updateRecords(task.getAppId(), updateRecords); updateRecords.clear(); + sleep(); } } if (insertRecords.size() > 0) { client.record().addRecords(task.getAppId(), insertRecords); } @@ -313,7 +322,16 @@ return allRecords; } private boolean existsRecord(List<String> distValues, UpdateKey updateKey) { return distValues.stream().anyMatch(v -> v.equals(updateKey.getValue().toString())); + } + + private void sleep() throws InterruptedException { + if (!task.getIntervalSeconds().isPresent()) { + return; + } + Integer interval = task.getIntervalSeconds().get(); + LOGGER.info(String.format("sleep %d seconds.", interval)); + TimeUnit.SECONDS.sleep(interval); } }