src/main/java/org/embulk/output/kintone/KintonePageOutput.java in embulk-output-kintone-0.3.6 vs src/main/java/org/embulk/output/kintone/KintonePageOutput.java in embulk-output-kintone-0.4.0
- old
+ new
@@ -10,23 +10,27 @@
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.embulk.config.ConfigException;
import org.embulk.config.TaskReport;
import org.embulk.spi.Column;
import org.embulk.spi.Exec;
import org.embulk.spi.Page;
import org.embulk.spi.PageReader;
import org.embulk.spi.Schema;
import org.embulk.spi.TransactionalPageOutput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class KintonePageOutput implements TransactionalPageOutput {
public static final int UPSERT_BATCH_SIZE = 10000;
public static final int CHUNK_SIZE = 100;
+ private static final Logger LOGGER = LoggerFactory.getLogger(KintonePageOutput.class);
private final PageReader pageReader;
private final PluginTask task;
private KintoneClient client;
public KintonePageOutput(PluginTask task, Schema schema) {
@@ -126,10 +130,11 @@
records.add(record);
if (records.size() == CHUNK_SIZE) {
client.record().addRecords(task.getAppId(), records);
records.clear();
+ sleep();
}
}
if (records.size() > 0) {
client.record().addRecords(task.getAppId(), records);
}
@@ -169,10 +174,11 @@
record.removeField(updateKey.getField());
updateRecords.add(new RecordForUpdate(updateKey, record));
if (updateRecords.size() == CHUNK_SIZE) {
client.record().updateRecords(task.getAppId(), updateRecords);
updateRecords.clear();
+ sleep();
}
}
if (updateRecords.size() > 0) {
client.record().updateRecords(task.getAppId(), updateRecords);
}
@@ -221,11 +227,12 @@
throw new RuntimeException("kintone throw exception", e);
}
});
}
- private void upsert(ArrayList<Record> records, ArrayList<UpdateKey> updateKeys) {
+ private void upsert(ArrayList<Record> records, ArrayList<UpdateKey> updateKeys)
+ throws InterruptedException {
if (records.size() != updateKeys.size()) {
throw new RuntimeException("records.size() != updateKeys.size()");
}
FieldType updateKeyFieldType =
client.app().getFormFields(task.getAppId()).get(updateKeys.get(0).getField()).getType();
@@ -267,13 +274,15 @@
}
if (insertRecords.size() == CHUNK_SIZE) {
client.record().addRecords(task.getAppId(), insertRecords);
insertRecords.clear();
+ sleep();
} else if (updateRecords.size() == CHUNK_SIZE) {
client.record().updateRecords(task.getAppId(), updateRecords);
updateRecords.clear();
+ sleep();
}
}
if (insertRecords.size() > 0) {
client.record().addRecords(task.getAppId(), insertRecords);
}
@@ -313,7 +322,16 @@
return allRecords;
}
private boolean existsRecord(List<String> distValues, UpdateKey updateKey) {
return distValues.stream().anyMatch(v -> v.equals(updateKey.getValue().toString()));
+ }
+
+ private void sleep() throws InterruptedException {
+ if (!task.getIntervalSeconds().isPresent()) {
+ return;
+ }
+ Integer interval = task.getIntervalSeconds().get();
+ LOGGER.info(String.format("sleep %d seconds.", interval));
+ TimeUnit.SECONDS.sleep(interval);
}
}