src/main/java/org/embulk/output/kintone/KintonePageOutput.java in embulk-output-kintone-0.2.1 vs src/main/java/org/embulk/output/kintone/KintonePageOutput.java in embulk-output-kintone-0.2.2
- old
+ new
@@ -15,13 +15,11 @@
import org.embulk.spi.TransactionalPageOutput;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.stream.Collectors;
public class KintonePageOutput
implements TransactionalPageOutput
{
@@ -185,15 +183,19 @@
throw new RuntimeException("kintone throw exception", e);
}
});
}
- private List<Record> getAllRecords(String fieldCode)
+ private List<Record> getRecordsByUpdateKey(String fieldCode, List<String> queryValues)
{
List<Record> allRecords = new ArrayList<Record>();
List<String> fields = Arrays.asList(fieldCode);
- String cursorId = client.record().createCursor(task.getAppId(), fields, null);
+ String cursorId = client.record().createCursor(
+ task.getAppId(),
+ fields,
+ fieldCode + " in (" + String.join(",", queryValues) + ")"
+ );
while (true) {
GetRecordsByCursorResponseBody resp = client.record().getRecordsByCursor(cursorId);
List<Record> records = resp.getRecords();
allRecords.addAll(records);
@@ -204,21 +206,63 @@
return allRecords;
}
abstract class UpsertPage<T>
{
- public abstract List<T> getUpdateKeyValues();
+ public abstract List<T> getUpdateKeyValues(List<String> queryValues);
public abstract boolean existsRecord(List<T> updateKeyValues, Record record);
+ public void upsert(ArrayList<Record> records, ArrayList<UpdateKey> updateKeys)
+ {
+ if (records.size() != updateKeys.size()) {
+ throw new RuntimeException("records.size() != updateKeys.size()");
+ }
+
+ List<String> queryValues = updateKeys
+ .stream()
+ .map(k -> "\"" + k.getValue().toString() + "\"")
+ .collect(Collectors.toList());
+ List<T> updateKeyValues = getUpdateKeyValues(queryValues);
+
+ ArrayList<Record> insertRecords = new ArrayList<>();
+ ArrayList<RecordForUpdate> updateRecords = new ArrayList<RecordForUpdate>();
+ for (int i = 0; i < records.size(); i++) {
+ Record record = records.get(i);
+ UpdateKey updateKey = updateKeys.get(i);
+
+ if (existsRecord(updateKeyValues, record)) {
+ record.removeField(updateKey.getField());
+ updateRecords.add(new RecordForUpdate(updateKey, record));
+ }
+ else {
+ insertRecords.add(record);
+ }
+
+ if (insertRecords.size() == 100) {
+ client.record().addRecords(task.getAppId(), insertRecords);
+ insertRecords.clear();
+ }
+ else if (updateRecords.size() == 100) {
+ client.record().updateRecords(task.getAppId(), updateRecords);
+ updateRecords.clear();
+ }
+ }
+ if (insertRecords.size() > 0) {
+ client.record().addRecords(task.getAppId(), insertRecords);
+ }
+ if (updateRecords.size() > 0) {
+ client.record().updateRecords(task.getAppId(), updateRecords);
+ }
+ }
+
public void run(final Page page)
{
execute(client -> {
try {
- List<T> updateKeyValues = getUpdateKeyValues();
+ ArrayList<Record> records = new ArrayList<>();
+ ArrayList<UpdateKey> updateKeys = new ArrayList<>();
- ArrayList<Record> insertRecords = new ArrayList<>();
- ArrayList<RecordForUpdate> updateRecords = new ArrayList<RecordForUpdate>();
pageReader.setPage(page);
KintoneColumnVisitor visitor = new KintoneColumnVisitor(pageReader,
task.getColumnOptions());
while (pageReader.nextRecord()) {
Record record = new Record();
@@ -226,35 +270,22 @@
visitor.setRecord(record);
visitor.setUpdateKey(updateKey);
for (Column column : pageReader.getSchema().getColumns()) {
column.visit(visitor);
}
+ records.add(record);
+ updateKeys.add(updateKey);
- if (existsRecord(updateKeyValues, record)) {
- record.removeField(updateKey.getField());
- updateRecords.add(new RecordForUpdate(updateKey, record));
+ if (records.size() == 10000) {
+ upsert(records, updateKeys);
+ records.clear();
+ updateKeys.clear();
}
- else {
- insertRecords.add(record);
- }
-
- if (insertRecords.size() == 100) {
- client.record().addRecords(task.getAppId(), insertRecords);
- insertRecords.clear();
- }
- else if (updateRecords.size() == 100) {
- client.record().updateRecords(task.getAppId(), updateRecords);
- updateRecords.clear();
- }
}
-
- if (insertRecords.size() > 0) {
- client.record().addRecords(task.getAppId(), insertRecords);
+ if (records.size() > 0) {
+ upsert(records, updateKeys);
}
- if (updateRecords.size() > 0) {
- client.record().updateRecords(task.getAppId(), updateRecords);
- }
}
catch (Exception e) {
throw new RuntimeException("kintone throw exception", e);
}
});
@@ -268,13 +299,13 @@
public UpsertPageByStringKey(String fieldCode)
{
this.fieldCode = fieldCode;
}
- public List<String> getUpdateKeyValues()
+ public List<String> getUpdateKeyValues(List<String> queryValues)
{
- return getAllRecords(fieldCode)
+ return getRecordsByUpdateKey(fieldCode, queryValues)
.stream()
.map(r -> r.getSingleLineTextFieldValue(fieldCode))
.collect(Collectors.toList());
}
@@ -291,12 +322,12 @@
public UpsertPageByNumberKey(String fieldCode)
{
this.fieldCode = fieldCode;
}
- public List<BigDecimal> getUpdateKeyValues()
+ public List<BigDecimal> getUpdateKeyValues(List<String> queryValues)
{
- return getAllRecords(fieldCode)
+ return getRecordsByUpdateKey(fieldCode, queryValues)
.stream()
.map(r -> r.getNumberFieldValue(fieldCode))
.collect(Collectors.toList());
}