src/main/java/org/embulk/output/kintone/KintonePageOutput.java in embulk-output-kintone-1.1.0 vs src/main/java/org/embulk/output/kintone/KintonePageOutput.java in embulk-output-kintone-1.2.0
- old
+ new
@@ -3,19 +3,15 @@
import static org.embulk.spi.util.RetryExecutor.retryExecutor;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
-import com.kintone.client.KintoneClient;
-import com.kintone.client.KintoneClientBuilder;
import com.kintone.client.api.record.GetRecordsByCursorResponseBody;
import com.kintone.client.exception.KintoneApiRuntimeException;
-import com.kintone.client.model.app.field.FieldProperty;
import com.kintone.client.model.record.FieldType;
import com.kintone.client.model.record.Record;
import com.kintone.client.model.record.RecordForUpdate;
-import com.kintone.client.model.record.UpdateKey;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
@@ -26,12 +22,15 @@
import java.util.TreeMap;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
-import org.embulk.config.ConfigException;
import org.embulk.config.TaskReport;
+import org.embulk.output.kintone.record.Id;
+import org.embulk.output.kintone.record.IdOrUpdateKey;
+import org.embulk.output.kintone.record.Skip;
+import org.embulk.output.kintone.util.Lazy;
import org.embulk.spi.Exec;
import org.embulk.spi.Page;
import org.embulk.spi.PageReader;
import org.embulk.spi.Schema;
import org.embulk.spi.TransactionalPageOutput;
@@ -51,51 +50,31 @@
);
private static final int UPSERT_BATCH_SIZE = 10000;
private final Map<String, Pair<FieldType, FieldType>> wrongTypeFields = new TreeMap<>();
private final PluginTask task;
private final PageReader reader;
- private KintoneClient client;
- private Map<String, FieldProperty> formFields;
+ private final Lazy<KintoneClient> client;
public KintonePageOutput(PluginTask task, Schema schema) {
this.task = task;
reader = new PageReader(schema);
+ client = KintoneClient.lazy(() -> task, schema);
}
@Override
public void add(Page page) {
- KintoneMode mode = KintoneMode.getKintoneModeByValue(task.getMode());
- switch (mode) {
- case INSERT:
- insertPage(page);
- break;
- case UPDATE:
- updatePage(page);
- break;
- case UPSERT:
- upsertPage(page);
- break;
- default:
- throw new UnsupportedOperationException(String.format("Unknown mode '%s'", task.getMode()));
- }
+ KintoneMode.of(task).add(page, task.getSkipIfNonExistingIdOrUpdateKey(), this);
}
@Override
public void finish() {
// noop
}
@Override
public void close() {
- if (client == null) {
- return; // Not connected
- }
- try {
- client.close();
- } catch (Exception e) {
- throw new RuntimeException("kintone throw exception", e);
- }
+ client.get().close();
}
@Override
public void abort() {
// noop
@@ -110,52 +89,29 @@
"Field type of %s is expected %s but actual %s",
key, value.getLeft(), value.getRight())));
return Exec.newTaskReport();
}
- private void connectIfNeeded() {
- if (client != null) {
- return; // Already connected
- }
- KintoneClientBuilder builder = KintoneClientBuilder.create("https://" + task.getDomain());
- if (task.getGuestSpaceId().isPresent()) {
- builder.setGuestSpaceId(task.getGuestSpaceId().get());
- }
- if (task.getBasicAuthUsername().isPresent() && task.getBasicAuthPassword().isPresent()) {
- builder.withBasicAuth(task.getBasicAuthUsername().get(), task.getBasicAuthPassword().get());
- }
- if (task.getUsername().isPresent() && task.getPassword().isPresent()) {
- builder.authByPassword(task.getUsername().get(), task.getPassword().get());
- } else if (task.getToken().isPresent()) {
- builder.authByApiToken(task.getToken().get());
- } else {
- throw new ConfigException("Username and password or token must be configured.");
- }
- client = builder.build();
- formFields = client.app().getFormFields(task.getAppId());
- }
-
private void insert(List<Record> records) {
- executeWithRetry(() -> client.record().addRecords(task.getAppId(), records));
+ executeWithRetry(() -> client.get().record().addRecords(task.getAppId(), records));
}
private void update(List<RecordForUpdate> records) {
- executeWithRetry(() -> client.record().updateRecords(task.getAppId(), records));
+ executeWithRetry(() -> client.get().record().updateRecords(task.getAppId(), records));
}
private <T> T executeWithRetry(Supplier<T> operation) {
- connectIfNeeded();
KintoneRetryOption retryOption = task.getRetryOptions();
try {
return retryExecutor()
.withRetryLimit(retryOption.getLimit())
.withInitialRetryWait(retryOption.getInitialWaitMillis())
.withMaxRetryWait(retryOption.getMaxWaitMillis())
.runInterruptible(
new Retryable<T>() {
@Override
- public T call() throws Exception {
+ public T call() {
return operation.get();
}
@Override
public boolean isRetryableException(Exception exception) {
@@ -173,12 +129,11 @@
}
}
@Override
public void onRetry(
- Exception exception, int retryCount, int retryLimit, int retryWait)
- throws RetryGiveupException {
+ Exception exception, int retryCount, int retryLimit, int retryWait) {
String message =
String.format(
"Retrying %d/%d after %d seconds. Message: %s",
retryCount, retryLimit, retryWait / 1000, exception.getMessage());
if (retryCount % 3 == 0) {
@@ -187,19 +142,18 @@
LOGGER.warn(message);
}
}
@Override
- public void onGiveup(Exception firstException, Exception lastException)
- throws RetryGiveupException {}
+ public void onGiveup(Exception firstException, Exception lastException) {}
});
} catch (RetryGiveupException | InterruptedException e) {
throw new RuntimeException("kintone throw exception", e);
}
}
- private void insertPage(Page page) {
+ public void insertPage(Page page) {
List<Record> records = new ArrayList<>();
reader.setPage(page);
KintoneColumnVisitor visitor =
new KintoneColumnVisitor(
reader,
@@ -222,91 +176,125 @@
if (!records.isEmpty()) {
insert(records);
}
}
- private void updatePage(Page page) {
+ public void updatePage(Page page) {
+ Skip skip = task.getSkipIfNonExistingIdOrUpdateKey();
List<RecordForUpdate> records = new ArrayList<>();
reader.setPage(page);
KintoneColumnVisitor visitor =
new KintoneColumnVisitor(
reader,
task.getDerivedColumns(),
task.getColumnOptions(),
task.getPreferNulls(),
task.getIgnoreNulls(),
task.getReduceKeyName().orElse(null),
- task.getUpdateKeyName()
- .orElseThrow(() -> new RuntimeException("unreachable"))); // Already validated
+ task.getUpdateKeyName().orElse(Id.FIELD));
while (reader.nextRecord()) {
Record record = new Record();
- UpdateKey updateKey = new UpdateKey();
+ IdOrUpdateKey idOrUpdateKey = new IdOrUpdateKey();
visitor.setRecord(record);
- visitor.setUpdateKey(updateKey);
+ visitor.setIdOrUpdateKey(idOrUpdateKey);
reader.getSchema().visitColumns(visitor);
putWrongTypeFields(record);
- if (updateKey.getValue() == null || updateKey.getValue().toString().isEmpty()) {
- LOGGER.warn("Record skipped because no update key value was specified");
+ if (skip == Skip.NEVER && !idOrUpdateKey.isPresent()) {
+ throw new RuntimeException("No id or update key value was specified");
+ } else if (!idOrUpdateKey.isPresent()) {
+ LOGGER.warn("Record skipped because no id or update key value was specified");
continue;
}
- records.add(new RecordForUpdate(updateKey, record.removeField(updateKey.getField())));
+ records.add(idOrUpdateKey.forUpdate(record));
if (records.size() == task.getChunkSize()) {
update(records);
records.clear();
}
}
if (!records.isEmpty()) {
update(records);
}
}
- private void upsertPage(Page page) {
+ public void upsertPage(Page page) {
List<Record> records = new ArrayList<>();
- List<UpdateKey> updateKeys = new ArrayList<>();
+ List<IdOrUpdateKey> idOrUpdateKeys = new ArrayList<>();
reader.setPage(page);
KintoneColumnVisitor visitor =
new KintoneColumnVisitor(
reader,
task.getDerivedColumns(),
task.getColumnOptions(),
task.getPreferNulls(),
task.getIgnoreNulls(),
task.getReduceKeyName().orElse(null),
- task.getUpdateKeyName()
- .orElseThrow(() -> new RuntimeException("unreachable"))); // Already validated
+ task.getUpdateKeyName().orElse(Id.FIELD));
while (reader.nextRecord()) {
Record record = new Record();
- UpdateKey updateKey = new UpdateKey();
+ IdOrUpdateKey idOrUpdateKey = new IdOrUpdateKey();
visitor.setRecord(record);
- visitor.setUpdateKey(updateKey);
+ visitor.setIdOrUpdateKey(idOrUpdateKey);
reader.getSchema().visitColumns(visitor);
putWrongTypeFields(record);
records.add(record);
- updateKeys.add(updateKey);
+ idOrUpdateKeys.add(idOrUpdateKey);
if (records.size() == UPSERT_BATCH_SIZE) {
- upsert(records, updateKeys);
+ upsert(records, idOrUpdateKeys);
records.clear();
- updateKeys.clear();
+ idOrUpdateKeys.clear();
}
}
if (!records.isEmpty()) {
- upsert(records, updateKeys);
+ upsert(records, idOrUpdateKeys);
}
}
- private void upsert(List<Record> records, List<UpdateKey> updateKeys) {
- if (records.size() != updateKeys.size()) {
- throw new RuntimeException("records.size() != updateKeys.size()");
+ private void upsert(List<Record> records, List<IdOrUpdateKey> idOrUpdateKeys) {
+ if (records.size() != idOrUpdateKeys.size()) {
+ throw new RuntimeException("records.size() != idOrUpdateKeys.size()");
}
- List<String> existingValues = executeWithRetry(() -> getExistingValuesByUpdateKey(updateKeys));
+ Skip skip = task.getSkipIfNonExistingIdOrUpdateKey();
+ String columnName = task.getUpdateKeyName().orElse(Id.FIELD);
+ boolean isId = columnName.equals(Id.FIELD);
+ List<String> existingValues =
+ executeWithRetry(() -> getExistingValuesByIdOrUpdateKey(idOrUpdateKeys, columnName));
List<Record> insertRecords = new ArrayList<>();
List<RecordForUpdate> updateRecords = new ArrayList<>();
for (int i = 0; i < records.size(); i++) {
+ RecordForUpdate recordForUpdate = null;
Record record = records.get(i);
- UpdateKey updateKey = updateKeys.get(i);
- if (existsRecord(existingValues, updateKey)) {
- updateRecords.add(new RecordForUpdate(updateKey, record.removeField(updateKey.getField())));
+ IdOrUpdateKey idOrUpdateKey = idOrUpdateKeys.get(i);
+ if (existsRecord(existingValues, idOrUpdateKey)) {
+ recordForUpdate = idOrUpdateKey.forUpdate(record);
+ } else if (skip == Skip.ALWAYS && idOrUpdateKey.isPresent()) {
+ LOGGER.warn(
+ "Record skipped because non existing id or update key '"
+ + idOrUpdateKey.getValue()
+ + "' was specified");
+ continue;
+ } else if (skip == Skip.ALWAYS && !idOrUpdateKey.isPresent()) {
+ LOGGER.warn("Record skipped because no id or update key value was specified");
+ continue;
+ } else if (skip == Skip.AUTO && idOrUpdateKey.isIdPresent()) {
+ LOGGER.warn(
+ "Record skipped because non existing id '"
+ + idOrUpdateKey.getValue()
+ + "' was specified");
+ continue;
+ } else if (skip == Skip.AUTO && !isId && !idOrUpdateKey.isUpdateKeyPresent()) {
+ LOGGER.warn("Record skipped because no update key value was specified");
+ continue;
+ } else if (idOrUpdateKey.isIdPresent()) {
+ LOGGER.warn(
+ "Record inserted though non existing id '"
+ + idOrUpdateKey.getValue()
+ + "' was specified");
+ } else if (!isId && !idOrUpdateKey.isUpdateKeyPresent()) {
+ LOGGER.warn("Record inserted though no update key value was specified");
+ }
+ if (recordForUpdate != null) {
+ updateRecords.add(recordForUpdate);
} else {
insertRecords.add(record);
}
if (insertRecords.size() == task.getChunkSize()) {
insert(insertRecords);
@@ -322,55 +310,57 @@
if (!updateRecords.isEmpty()) {
update(updateRecords);
}
}
- private List<String> getExistingValuesByUpdateKey(List<UpdateKey> updateKeys) {
- String fieldCode =
- updateKeys.stream()
- .map(UpdateKey::getField)
- .filter(Objects::nonNull)
- .findFirst()
- .orElse(null);
- if (fieldCode == null) {
- return Collections.emptyList();
- }
- Function<Record, String> fieldValueAsString;
- FieldType fieldType = getFieldType(fieldCode);
- if (fieldType == FieldType.SINGLE_LINE_TEXT) {
- fieldValueAsString = record -> record.getSingleLineTextFieldValue(fieldCode);
- } else if (fieldType == FieldType.NUMBER) {
- fieldValueAsString = record -> toString(record.getNumberFieldValue(fieldCode));
- } else {
- throw new ConfigException("The update_key must be 'SINGLE_LINE_TEXT' or 'NUMBER'.");
- }
+ private List<String> getExistingValuesByIdOrUpdateKey(
+ List<IdOrUpdateKey> idOrUpdateKeys, String columnName) {
List<String> queryValues =
- updateKeys.stream()
- .filter(k -> k.getValue() != null && !k.getValue().toString().isEmpty())
+ idOrUpdateKeys.stream()
+ .filter(IdOrUpdateKey::isPresent)
.map(k -> "\"" + k.getValue() + "\"")
.collect(Collectors.toList());
if (queryValues.isEmpty()) {
return Collections.emptyList();
}
+ return columnName.equals(Id.FIELD)
+ ? getExistingValuesById(queryValues)
+ : getExistingValuesByUpdateKey(columnName, queryValues);
+ }
+
+ private List<String> getExistingValuesById(List<String> queryValues) {
+ return getExistingValues(Id.FIELD, Record::getId, queryValues);
+ }
+
+ private List<String> getExistingValuesByUpdateKey(String columnName, List<String> queryValues) {
+ KintoneColumnOption option = task.getColumnOptions().get(columnName);
+ String fieldCode = option != null ? option.getFieldCode() : columnName;
+ KintoneColumnType type = KintoneColumnType.valueOf(getFieldType(fieldCode).name());
+ return getExistingValues(fieldCode, record -> type.getValue(record, fieldCode), queryValues);
+ }
+
+ private List<String> getExistingValues(
+ String fieldCode, Function<Record, Object> toValue, List<String> queryValues) {
String cursorId =
client
+ .get()
.record()
.createCursor(
task.getAppId(),
Collections.singletonList(fieldCode),
fieldCode + " in (" + String.join(",", queryValues) + ")");
- List<Record> allRecords = new ArrayList<>();
+ List<Record> records = new ArrayList<>();
while (true) {
- GetRecordsByCursorResponseBody resp = client.record().getRecordsByCursor(cursorId);
- List<Record> records = resp.getRecords();
- allRecords.addAll(records);
- if (!resp.hasNext()) {
+ GetRecordsByCursorResponseBody cursor = client.get().record().getRecordsByCursor(cursorId);
+ records.addAll(cursor.getRecords());
+ if (!cursor.hasNext()) {
break;
}
}
- return allRecords.stream()
- .map(fieldValueAsString)
+ return records.stream()
+ .map(toValue)
+ .map(KintonePageOutput::toString)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
private void putWrongTypeFields(Record record) {
@@ -382,16 +372,14 @@
.filter(entry -> entry.getValue().getLeft() != entry.getValue().getRight())
.forEach(entry -> wrongTypeFields.put(entry.getKey(), entry.getValue()));
}
private FieldType getFieldType(String fieldCode) {
- connectIfNeeded();
- FieldProperty field = formFields.get(fieldCode);
- return field == null ? null : field.getType();
+ return client.get().getFieldType(fieldCode);
}
- private static boolean existsRecord(List<String> existingValues, UpdateKey updateKey) {
- String value = toString(updateKey.getValue());
+ private static boolean existsRecord(List<String> existingValues, IdOrUpdateKey idOrUpdateKey) {
+ String value = toString(idOrUpdateKey.getValue());
return value != null && existingValues.stream().anyMatch(v -> v.equals(value));
}
private static String toString(Object value) {
return value == null