src/main/java/org/embulk/output/kintone/KintonePageOutput.java in embulk-output-kintone-0.2.1 vs src/main/java/org/embulk/output/kintone/KintonePageOutput.java in embulk-output-kintone-0.2.2

- old
+ new

@@ -15,13 +15,11 @@ import org.embulk.spi.TransactionalPageOutput; import java.math.BigDecimal; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; public class KintonePageOutput implements TransactionalPageOutput { @@ -185,15 +183,19 @@ throw new RuntimeException("kintone throw exception", e); } }); } - private List<Record> getAllRecords(String fieldCode) + private List<Record> getRecordsByUpdateKey(String fieldCode, List<String> queryValues) { List<Record> allRecords = new ArrayList<Record>(); List<String> fields = Arrays.asList(fieldCode); - String cursorId = client.record().createCursor(task.getAppId(), fields, null); + String cursorId = client.record().createCursor( + task.getAppId(), + fields, + fieldCode + " in (" + String.join(",", queryValues) + ")" + ); while (true) { GetRecordsByCursorResponseBody resp = client.record().getRecordsByCursor(cursorId); List<Record> records = resp.getRecords(); allRecords.addAll(records); @@ -204,21 +206,63 @@ return allRecords; } abstract class UpsertPage<T> { - public abstract List<T> getUpdateKeyValues(); + public abstract List<T> getUpdateKeyValues(List<String> queryValues); public abstract boolean existsRecord(List<T> updateKeyValues, Record record); + public void upsert(ArrayList<Record> records, ArrayList<UpdateKey> updateKeys) + { + if (records.size() != updateKeys.size()) { + throw new RuntimeException("records.size() != updateKeys.size()"); + } + + List<String> queryValues = updateKeys + .stream() + .map(k -> "\"" + k.getValue().toString() + "\"") + .collect(Collectors.toList()); + List<T> updateKeyValues = getUpdateKeyValues(queryValues); + + ArrayList<Record> insertRecords = new ArrayList<>(); + ArrayList<RecordForUpdate> updateRecords = new ArrayList<RecordForUpdate>(); + for (int i = 0; i < records.size(); i++) { + Record record = records.get(i); + UpdateKey updateKey = updateKeys.get(i); + + if (existsRecord(updateKeyValues, record)) { + record.removeField(updateKey.getField()); + updateRecords.add(new RecordForUpdate(updateKey, record)); + } + else { + insertRecords.add(record); + } + + if (insertRecords.size() == 100) { + client.record().addRecords(task.getAppId(), insertRecords); + insertRecords.clear(); + } + else if (updateRecords.size() == 100) { + client.record().updateRecords(task.getAppId(), updateRecords); + updateRecords.clear(); + } + } + if (insertRecords.size() > 0) { + client.record().addRecords(task.getAppId(), insertRecords); + } + if (updateRecords.size() > 0) { + client.record().updateRecords(task.getAppId(), updateRecords); + } + } + public void run(final Page page) { execute(client -> { try { - List<T> updateKeyValues = getUpdateKeyValues(); + ArrayList<Record> records = new ArrayList<>(); + ArrayList<UpdateKey> updateKeys = new ArrayList<>(); - ArrayList<Record> insertRecords = new ArrayList<>(); - ArrayList<RecordForUpdate> updateRecords = new ArrayList<RecordForUpdate>(); pageReader.setPage(page); KintoneColumnVisitor visitor = new KintoneColumnVisitor(pageReader, task.getColumnOptions()); while (pageReader.nextRecord()) { Record record = new Record(); @@ -226,35 +270,22 @@ visitor.setRecord(record); visitor.setUpdateKey(updateKey); for (Column column : pageReader.getSchema().getColumns()) { column.visit(visitor); } + records.add(record); + updateKeys.add(updateKey); - if (existsRecord(updateKeyValues, record)) { - record.removeField(updateKey.getField()); - updateRecords.add(new RecordForUpdate(updateKey, record)); + if (records.size() == 10000) { + upsert(records, updateKeys); + records.clear(); + updateKeys.clear(); } - else { - insertRecords.add(record); - } - - if (insertRecords.size() == 100) { - client.record().addRecords(task.getAppId(), insertRecords); - insertRecords.clear(); - } - else if (updateRecords.size() == 100) { - client.record().updateRecords(task.getAppId(), updateRecords); - updateRecords.clear(); - } } - - if (insertRecords.size() > 0) { - client.record().addRecords(task.getAppId(), insertRecords); + if (records.size() > 0) { + upsert(records, updateKeys); } - if (updateRecords.size() > 0) { - client.record().updateRecords(task.getAppId(), updateRecords); - } } catch (Exception e) { throw new RuntimeException("kintone throw exception", e); } }); @@ -268,13 +299,13 @@ public UpsertPageByStringKey(String fieldCode) { this.fieldCode = fieldCode; } - public List<String> getUpdateKeyValues() + public List<String> getUpdateKeyValues(List<String> queryValues) { - return getAllRecords(fieldCode) + return getRecordsByUpdateKey(fieldCode, queryValues) .stream() .map(r -> r.getSingleLineTextFieldValue(fieldCode)) .collect(Collectors.toList()); } @@ -291,12 +322,12 @@ public UpsertPageByNumberKey(String fieldCode) { this.fieldCode = fieldCode; } - public List<BigDecimal> getUpdateKeyValues() + public List<BigDecimal> getUpdateKeyValues(List<String> queryValues) { - return getAllRecords(fieldCode) + return getRecordsByUpdateKey(fieldCode, queryValues) .stream() .map(r -> r.getNumberFieldValue(fieldCode)) .collect(Collectors.toList()); }