src/main/java/org/embulk/output/kintone/KintonePageOutput.java in embulk-output-kintone-1.1.0 vs src/main/java/org/embulk/output/kintone/KintonePageOutput.java in embulk-output-kintone-1.2.0

- old
+ new

@@ -3,19 +3,15 @@ import static org.embulk.spi.util.RetryExecutor.retryExecutor; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Maps; -import com.kintone.client.KintoneClient; -import com.kintone.client.KintoneClientBuilder; import com.kintone.client.api.record.GetRecordsByCursorResponseBody; import com.kintone.client.exception.KintoneApiRuntimeException; -import com.kintone.client.model.app.field.FieldProperty; import com.kintone.client.model.record.FieldType; import com.kintone.client.model.record.Record; import com.kintone.client.model.record.RecordForUpdate; -import com.kintone.client.model.record.UpdateKey; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.math.BigDecimal; import java.util.ArrayList; import java.util.Arrays; @@ -26,12 +22,15 @@ import java.util.TreeMap; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; -import org.embulk.config.ConfigException; import org.embulk.config.TaskReport; +import org.embulk.output.kintone.record.Id; +import org.embulk.output.kintone.record.IdOrUpdateKey; +import org.embulk.output.kintone.record.Skip; +import org.embulk.output.kintone.util.Lazy; import org.embulk.spi.Exec; import org.embulk.spi.Page; import org.embulk.spi.PageReader; import org.embulk.spi.Schema; import org.embulk.spi.TransactionalPageOutput; @@ -51,51 +50,31 @@ ); private static final int UPSERT_BATCH_SIZE = 10000; private final Map<String, Pair<FieldType, FieldType>> wrongTypeFields = new TreeMap<>(); private final PluginTask task; private final PageReader reader; - private KintoneClient client; - private Map<String, FieldProperty> formFields; + private final Lazy<KintoneClient> client; public KintonePageOutput(PluginTask task, Schema schema) { this.task = task; reader = new PageReader(schema); + client = KintoneClient.lazy(() -> task, schema); } @Override public void add(Page page) { - KintoneMode mode = KintoneMode.getKintoneModeByValue(task.getMode()); - switch (mode) { - case INSERT: - insertPage(page); - break; - case UPDATE: - updatePage(page); - break; - case UPSERT: - upsertPage(page); - break; - default: - throw new UnsupportedOperationException(String.format("Unknown mode '%s'", task.getMode())); - } + KintoneMode.of(task).add(page, task.getSkipIfNonExistingIdOrUpdateKey(), this); } @Override public void finish() { // noop } @Override public void close() { - if (client == null) { - return; // Not connected - } - try { - client.close(); - } catch (Exception e) { - throw new RuntimeException("kintone throw exception", e); - } + client.get().close(); } @Override public void abort() { // noop @@ -110,52 +89,29 @@ "Field type of %s is expected %s but actual %s", key, value.getLeft(), value.getRight()))); return Exec.newTaskReport(); } - private void connectIfNeeded() { - if (client != null) { - return; // Already connected - } - KintoneClientBuilder builder = KintoneClientBuilder.create("https://" + task.getDomain()); - if (task.getGuestSpaceId().isPresent()) { - builder.setGuestSpaceId(task.getGuestSpaceId().get()); - } - if (task.getBasicAuthUsername().isPresent() && task.getBasicAuthPassword().isPresent()) { - builder.withBasicAuth(task.getBasicAuthUsername().get(), task.getBasicAuthPassword().get()); - } - if (task.getUsername().isPresent() && task.getPassword().isPresent()) { - builder.authByPassword(task.getUsername().get(), task.getPassword().get()); - } else if (task.getToken().isPresent()) { - builder.authByApiToken(task.getToken().get()); - } else { - throw new ConfigException("Username and password or token must be configured."); - } - client = builder.build(); - formFields = client.app().getFormFields(task.getAppId()); - } - private void insert(List<Record> records) { - executeWithRetry(() -> client.record().addRecords(task.getAppId(), records)); + executeWithRetry(() -> client.get().record().addRecords(task.getAppId(), records)); } private void update(List<RecordForUpdate> records) { - executeWithRetry(() -> client.record().updateRecords(task.getAppId(), records)); + executeWithRetry(() -> client.get().record().updateRecords(task.getAppId(), records)); } private <T> T executeWithRetry(Supplier<T> operation) { - connectIfNeeded(); KintoneRetryOption retryOption = task.getRetryOptions(); try { return retryExecutor() .withRetryLimit(retryOption.getLimit()) .withInitialRetryWait(retryOption.getInitialWaitMillis()) .withMaxRetryWait(retryOption.getMaxWaitMillis()) .runInterruptible( new Retryable<T>() { @Override - public T call() throws Exception { + public T call() { return operation.get(); } @Override public boolean isRetryableException(Exception exception) { @@ -173,12 +129,11 @@ } } @Override public void onRetry( - Exception exception, int retryCount, int retryLimit, int retryWait) - throws RetryGiveupException { + Exception exception, int retryCount, int retryLimit, int retryWait) { String message = String.format( "Retrying %d/%d after %d seconds. Message: %s", retryCount, retryLimit, retryWait / 1000, exception.getMessage()); if (retryCount % 3 == 0) { @@ -187,19 +142,18 @@ LOGGER.warn(message); } } @Override - public void onGiveup(Exception firstException, Exception lastException) - throws RetryGiveupException {} + public void onGiveup(Exception firstException, Exception lastException) {} }); } catch (RetryGiveupException | InterruptedException e) { throw new RuntimeException("kintone throw exception", e); } } - private void insertPage(Page page) { + public void insertPage(Page page) { List<Record> records = new ArrayList<>(); reader.setPage(page); KintoneColumnVisitor visitor = new KintoneColumnVisitor( reader, @@ -222,91 +176,125 @@ if (!records.isEmpty()) { insert(records); } } - private void updatePage(Page page) { + public void updatePage(Page page) { + Skip skip = task.getSkipIfNonExistingIdOrUpdateKey(); List<RecordForUpdate> records = new ArrayList<>(); reader.setPage(page); KintoneColumnVisitor visitor = new KintoneColumnVisitor( reader, task.getDerivedColumns(), task.getColumnOptions(), task.getPreferNulls(), task.getIgnoreNulls(), task.getReduceKeyName().orElse(null), - task.getUpdateKeyName() - .orElseThrow(() -> new RuntimeException("unreachable"))); // Already validated + task.getUpdateKeyName().orElse(Id.FIELD)); while (reader.nextRecord()) { Record record = new Record(); - UpdateKey updateKey = new UpdateKey(); + IdOrUpdateKey idOrUpdateKey = new IdOrUpdateKey(); visitor.setRecord(record); - visitor.setUpdateKey(updateKey); + visitor.setIdOrUpdateKey(idOrUpdateKey); reader.getSchema().visitColumns(visitor); putWrongTypeFields(record); - if (updateKey.getValue() == null || updateKey.getValue().toString().isEmpty()) { - LOGGER.warn("Record skipped because no update key value was specified"); + if (skip == Skip.NEVER && !idOrUpdateKey.isPresent()) { + throw new RuntimeException("No id or update key value was specified"); + } else if (!idOrUpdateKey.isPresent()) { + LOGGER.warn("Record skipped because no id or update key value was specified"); continue; } - records.add(new RecordForUpdate(updateKey, record.removeField(updateKey.getField()))); + records.add(idOrUpdateKey.forUpdate(record)); if (records.size() == task.getChunkSize()) { update(records); records.clear(); } } if (!records.isEmpty()) { update(records); } } - private void upsertPage(Page page) { + public void upsertPage(Page page) { List<Record> records = new ArrayList<>(); - List<UpdateKey> updateKeys = new ArrayList<>(); + List<IdOrUpdateKey> idOrUpdateKeys = new ArrayList<>(); reader.setPage(page); KintoneColumnVisitor visitor = new KintoneColumnVisitor( reader, task.getDerivedColumns(), task.getColumnOptions(), task.getPreferNulls(), task.getIgnoreNulls(), task.getReduceKeyName().orElse(null), - task.getUpdateKeyName() - .orElseThrow(() -> new RuntimeException("unreachable"))); // Already validated + task.getUpdateKeyName().orElse(Id.FIELD)); while (reader.nextRecord()) { Record record = new Record(); - UpdateKey updateKey = new UpdateKey(); + IdOrUpdateKey idOrUpdateKey = new IdOrUpdateKey(); visitor.setRecord(record); - visitor.setUpdateKey(updateKey); + visitor.setIdOrUpdateKey(idOrUpdateKey); reader.getSchema().visitColumns(visitor); putWrongTypeFields(record); records.add(record); - updateKeys.add(updateKey); + idOrUpdateKeys.add(idOrUpdateKey); if (records.size() == UPSERT_BATCH_SIZE) { - upsert(records, updateKeys); + upsert(records, idOrUpdateKeys); records.clear(); - updateKeys.clear(); + idOrUpdateKeys.clear(); } } if (!records.isEmpty()) { - upsert(records, updateKeys); + upsert(records, idOrUpdateKeys); } } - private void upsert(List<Record> records, List<UpdateKey> updateKeys) { - if (records.size() != updateKeys.size()) { - throw new RuntimeException("records.size() != updateKeys.size()"); + private void upsert(List<Record> records, List<IdOrUpdateKey> idOrUpdateKeys) { + if (records.size() != idOrUpdateKeys.size()) { + throw new RuntimeException("records.size() != idOrUpdateKeys.size()"); } - List<String> existingValues = executeWithRetry(() -> getExistingValuesByUpdateKey(updateKeys)); + Skip skip = task.getSkipIfNonExistingIdOrUpdateKey(); + String columnName = task.getUpdateKeyName().orElse(Id.FIELD); + boolean isId = columnName.equals(Id.FIELD); + List<String> existingValues = + executeWithRetry(() -> getExistingValuesByIdOrUpdateKey(idOrUpdateKeys, columnName)); List<Record> insertRecords = new ArrayList<>(); List<RecordForUpdate> updateRecords = new ArrayList<>(); for (int i = 0; i < records.size(); i++) { + RecordForUpdate recordForUpdate = null; Record record = records.get(i); - UpdateKey updateKey = updateKeys.get(i); - if (existsRecord(existingValues, updateKey)) { - updateRecords.add(new RecordForUpdate(updateKey, record.removeField(updateKey.getField()))); + IdOrUpdateKey idOrUpdateKey = idOrUpdateKeys.get(i); + if (existsRecord(existingValues, idOrUpdateKey)) { + recordForUpdate = idOrUpdateKey.forUpdate(record); + } else if (skip == Skip.ALWAYS && idOrUpdateKey.isPresent()) { + LOGGER.warn( + "Record skipped because non existing id or update key '" + + idOrUpdateKey.getValue() + + "' was specified"); + continue; + } else if (skip == Skip.ALWAYS && !idOrUpdateKey.isPresent()) { + LOGGER.warn("Record skipped because no id or update key value was specified"); + continue; + } else if (skip == Skip.AUTO && idOrUpdateKey.isIdPresent()) { + LOGGER.warn( + "Record skipped because non existing id '" + + idOrUpdateKey.getValue() + + "' was specified"); + continue; + } else if (skip == Skip.AUTO && !isId && !idOrUpdateKey.isUpdateKeyPresent()) { + LOGGER.warn("Record skipped because no update key value was specified"); + continue; + } else if (idOrUpdateKey.isIdPresent()) { + LOGGER.warn( + "Record inserted though non existing id '" + + idOrUpdateKey.getValue() + + "' was specified"); + } else if (!isId && !idOrUpdateKey.isUpdateKeyPresent()) { + LOGGER.warn("Record inserted though no update key value was specified"); + } + if (recordForUpdate != null) { + updateRecords.add(recordForUpdate); } else { insertRecords.add(record); } if (insertRecords.size() == task.getChunkSize()) { insert(insertRecords); @@ -322,55 +310,57 @@ if (!updateRecords.isEmpty()) { update(updateRecords); } } - private List<String> getExistingValuesByUpdateKey(List<UpdateKey> updateKeys) { - String fieldCode = - updateKeys.stream() - .map(UpdateKey::getField) - .filter(Objects::nonNull) - .findFirst() - .orElse(null); - if (fieldCode == null) { - return Collections.emptyList(); - } - Function<Record, String> fieldValueAsString; - FieldType fieldType = getFieldType(fieldCode); - if (fieldType == FieldType.SINGLE_LINE_TEXT) { - fieldValueAsString = record -> record.getSingleLineTextFieldValue(fieldCode); - } else if (fieldType == FieldType.NUMBER) { - fieldValueAsString = record -> toString(record.getNumberFieldValue(fieldCode)); - } else { - throw new ConfigException("The update_key must be 'SINGLE_LINE_TEXT' or 'NUMBER'."); - } + private List<String> getExistingValuesByIdOrUpdateKey( + List<IdOrUpdateKey> idOrUpdateKeys, String columnName) { List<String> queryValues = - updateKeys.stream() - .filter(k -> k.getValue() != null && !k.getValue().toString().isEmpty()) + idOrUpdateKeys.stream() + .filter(IdOrUpdateKey::isPresent) .map(k -> "\"" + k.getValue() + "\"") .collect(Collectors.toList()); if (queryValues.isEmpty()) { return Collections.emptyList(); } + return columnName.equals(Id.FIELD) + ? getExistingValuesById(queryValues) + : getExistingValuesByUpdateKey(columnName, queryValues); + } + + private List<String> getExistingValuesById(List<String> queryValues) { + return getExistingValues(Id.FIELD, Record::getId, queryValues); + } + + private List<String> getExistingValuesByUpdateKey(String columnName, List<String> queryValues) { + KintoneColumnOption option = task.getColumnOptions().get(columnName); + String fieldCode = option != null ? option.getFieldCode() : columnName; + KintoneColumnType type = KintoneColumnType.valueOf(getFieldType(fieldCode).name()); + return getExistingValues(fieldCode, record -> type.getValue(record, fieldCode), queryValues); + } + + private List<String> getExistingValues( + String fieldCode, Function<Record, Object> toValue, List<String> queryValues) { String cursorId = client + .get() .record() .createCursor( task.getAppId(), Collections.singletonList(fieldCode), fieldCode + " in (" + String.join(",", queryValues) + ")"); - List<Record> allRecords = new ArrayList<>(); + List<Record> records = new ArrayList<>(); while (true) { - GetRecordsByCursorResponseBody resp = client.record().getRecordsByCursor(cursorId); - List<Record> records = resp.getRecords(); - allRecords.addAll(records); - if (!resp.hasNext()) { + GetRecordsByCursorResponseBody cursor = client.get().record().getRecordsByCursor(cursorId); + records.addAll(cursor.getRecords()); + if (!cursor.hasNext()) { break; } } - return allRecords.stream() - .map(fieldValueAsString) + return records.stream() + .map(toValue) + .map(KintonePageOutput::toString) .filter(Objects::nonNull) .collect(Collectors.toList()); } private void putWrongTypeFields(Record record) { @@ -382,16 +372,14 @@ .filter(entry -> entry.getValue().getLeft() != entry.getValue().getRight()) .forEach(entry -> wrongTypeFields.put(entry.getKey(), entry.getValue())); } private FieldType getFieldType(String fieldCode) { - connectIfNeeded(); - FieldProperty field = formFields.get(fieldCode); - return field == null ? null : field.getType(); + return client.get().getFieldType(fieldCode); } - private static boolean existsRecord(List<String> existingValues, UpdateKey updateKey) { - String value = toString(updateKey.getValue()); + private static boolean existsRecord(List<String> existingValues, IdOrUpdateKey idOrUpdateKey) { + String value = toString(idOrUpdateKey.getValue()); return value != null && existingValues.stream().anyMatch(v -> v.equals(value)); } private static String toString(Object value) { return value == null