src/main/java/org/embulk/output/kintone/KintonePageOutput.java in embulk-output-kintone-1.0.0 vs src/main/java/org/embulk/output/kintone/KintonePageOutput.java in embulk-output-kintone-1.1.0

- old
+ new

@@ -48,11 +48,10 @@ "GAIA_TM12", // 作成できるカーソルの上限に達しているため、カーソルを作成できません。不要なカーソルを削除するか、しばらく経ってから再実行してください。 "GAIA_RE18", // データベースのロックに失敗したため、変更を保存できませんでした。時間をおいて再度お試しください。 "GAIA_DA02" // データベースのロックに失敗したため、変更を保存できませんでした。時間をおいて再度お試しください。 ); private static final int UPSERT_BATCH_SIZE = 10000; - private static final int CHUNK_SIZE = 100; private final Map<String, Pair<FieldType, FieldType>> wrongTypeFields = new TreeMap<>(); private final PluginTask task; private final PageReader reader; private KintoneClient client; private Map<String, FieldProperty> formFields; @@ -111,11 +110,11 @@ "Field type of %s is expected %s but actual %s", key, value.getLeft(), value.getRight()))); return Exec.newTaskReport(); } - public void connectIfNeeded() { + private void connectIfNeeded() { if (client != null) { return; // Already connected } KintoneClientBuilder builder = KintoneClientBuilder.create("https://" + task.getDomain()); if (task.getGuestSpaceId().isPresent()) { @@ -201,18 +200,23 @@ private void insertPage(Page page) { List<Record> records = new ArrayList<>(); reader.setPage(page); KintoneColumnVisitor visitor = new KintoneColumnVisitor( - reader, task.getColumnOptions(), task.getPreferNulls(), task.getIgnoreNulls()); + reader, + task.getDerivedColumns(), + task.getColumnOptions(), + task.getPreferNulls(), + task.getIgnoreNulls(), + task.getReduceKeyName().orElse(null)); while (reader.nextRecord()) { Record record = new Record(); visitor.setRecord(record); reader.getSchema().visitColumns(visitor); putWrongTypeFields(record); records.add(record); - if (records.size() == CHUNK_SIZE) { + if (records.size() == task.getChunkSize()) { insert(records); records.clear(); } } if (!records.isEmpty()) { @@ -224,13 +228,15 @@ List<RecordForUpdate> records = new ArrayList<>(); reader.setPage(page); KintoneColumnVisitor visitor = new KintoneColumnVisitor( reader, + task.getDerivedColumns(), task.getColumnOptions(), task.getPreferNulls(), task.getIgnoreNulls(), + task.getReduceKeyName().orElse(null), task.getUpdateKeyName() .orElseThrow(() -> new RuntimeException("unreachable"))); // Already validated while (reader.nextRecord()) { Record record = new Record(); UpdateKey updateKey = new UpdateKey(); @@ -241,11 +247,11 @@ if (updateKey.getValue() == null || updateKey.getValue().toString().isEmpty()) { LOGGER.warn("Record skipped because no update key value was specified"); continue; } records.add(new RecordForUpdate(updateKey, record.removeField(updateKey.getField()))); - if (records.size() == CHUNK_SIZE) { + if (records.size() == task.getChunkSize()) { update(records); records.clear(); } } if (!records.isEmpty()) { @@ -258,13 +264,15 @@ List<UpdateKey> updateKeys = new ArrayList<>(); reader.setPage(page); KintoneColumnVisitor visitor = new KintoneColumnVisitor( reader, + task.getDerivedColumns(), task.getColumnOptions(), task.getPreferNulls(), task.getIgnoreNulls(), + task.getReduceKeyName().orElse(null), task.getUpdateKeyName() .orElseThrow(() -> new RuntimeException("unreachable"))); // Already validated while (reader.nextRecord()) { Record record = new Record(); UpdateKey updateKey = new UpdateKey(); @@ -298,14 +306,14 @@ if (existsRecord(existingValues, updateKey)) { updateRecords.add(new RecordForUpdate(updateKey, record.removeField(updateKey.getField()))); } else { insertRecords.add(record); } - if (insertRecords.size() == CHUNK_SIZE) { + if (insertRecords.size() == task.getChunkSize()) { insert(insertRecords); insertRecords.clear(); - } else if (updateRecords.size() == CHUNK_SIZE) { + } else if (updateRecords.size() == task.getChunkSize()) { update(updateRecords); updateRecords.clear(); } } if (!insertRecords.isEmpty()) { @@ -363,15 +371,10 @@ .map(fieldValueAsString) .filter(Objects::nonNull) .collect(Collectors.toList()); } - private boolean existsRecord(List<String> existingValues, UpdateKey updateKey) { - String value = toString(updateKey.getValue()); - return value != null && existingValues.stream().anyMatch(v -> v.equals(value)); - } - private void putWrongTypeFields(Record record) { record.getFieldCodes(true).stream() .map( fieldCode -> Maps.immutableEntry( @@ -382,9 +385,14 @@ private FieldType getFieldType(String fieldCode) { connectIfNeeded(); FieldProperty field = formFields.get(fieldCode); return field == null ? null : field.getType(); + } + + private static boolean existsRecord(List<String> existingValues, UpdateKey updateKey) { + String value = toString(updateKey.getValue()); + return value != null && existingValues.stream().anyMatch(v -> v.equals(value)); } private static String toString(Object value) { return value == null ? null