src/main/java/org/embulk/output/kintone/KintonePageOutput.java in embulk-output-kintone-0.4.1 vs src/main/java/org/embulk/output/kintone/KintonePageOutput.java in embulk-output-kintone-1.0.0
- old
+ new
@@ -2,28 +2,36 @@
import static org.embulk.spi.util.RetryExecutor.retryExecutor;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Maps;
import com.kintone.client.KintoneClient;
import com.kintone.client.KintoneClientBuilder;
import com.kintone.client.api.record.GetRecordsByCursorResponseBody;
import com.kintone.client.exception.KintoneApiRuntimeException;
+import com.kintone.client.model.app.field.FieldProperty;
import com.kintone.client.model.record.FieldType;
import com.kintone.client.model.record.Record;
import com.kintone.client.model.record.RecordForUpdate;
import com.kintone.client.model.record.UpdateKey;
import java.io.IOException;
+import java.lang.invoke.MethodHandles;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
import org.embulk.config.ConfigException;
import org.embulk.config.TaskReport;
-import org.embulk.spi.Column;
import org.embulk.spi.Exec;
import org.embulk.spi.Page;
import org.embulk.spi.PageReader;
import org.embulk.spi.Schema;
import org.embulk.spi.TransactionalPageOutput;
@@ -31,26 +39,29 @@
import org.embulk.spi.util.RetryExecutor.Retryable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KintonePageOutput implements TransactionalPageOutput {
- public static final int UPSERT_BATCH_SIZE = 10000;
- public static final int CHUNK_SIZE = 100;
- private static final Logger LOGGER = LoggerFactory.getLogger(KintonePageOutput.class);
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final List<String> RETRYABLE_ERROR_CODES =
Arrays.asList(
- "GAIA_TM12", // 作成できるカーソルの上限に達しているため、カーソ ルを作成できません。不要なカーソルを削除するか、しばらく経ってから再実行してください。
+ "GAIA_TM12", // 作成できるカーソルの上限に達しているため、カーソルを作成できません。不要なカーソルを削除するか、しばらく経ってから再実行してください。
"GAIA_RE18", // データベースのロックに失敗したため、変更を保存できませんでした。時間をおいて再度お試しください。
"GAIA_DA02" // データベースのロックに失敗したため、変更を保存できませんでした。時間をおいて再度お試しください。
);
- private final PageReader pageReader;
+ private static final int UPSERT_BATCH_SIZE = 10000;
+ private static final int CHUNK_SIZE = 100;
+ private final Map<String, Pair<FieldType, FieldType>> wrongTypeFields = new TreeMap<>();
private final PluginTask task;
+ private final PageReader reader;
private KintoneClient client;
+ private Map<String, FieldProperty> formFields;
public KintonePageOutput(PluginTask task, Schema schema) {
- this.pageReader = new PageReader(schema);
this.task = task;
+ reader = new PageReader(schema);
}
@Override
public void add(Page page) {
KintoneMode mode = KintoneMode.getKintoneModeByValue(task.getMode());
@@ -74,15 +85,15 @@
// noop
}
@Override
public void close() {
- if (this.client == null) {
- return;
+ if (client == null) {
+ return; // Not connected
}
try {
- this.client.close();
+ client.close();
} catch (Exception e) {
throw new RuntimeException("kintone throw exception", e);
}
}
@@ -91,82 +102,77 @@
// noop
}
@Override
public TaskReport commit() {
+ wrongTypeFields.forEach(
+ (key, value) ->
+ LOGGER.warn(
+ String.format(
+ "Field type of %s is expected %s but actual %s",
+ key, value.getLeft(), value.getRight())));
return Exec.newTaskReport();
}
- public interface Consumer<T> {
- void accept(T t);
- }
-
- public void connect(final PluginTask task) {
+ public void connectIfNeeded() {
+ if (client != null) {
+ return; // Already connected
+ }
KintoneClientBuilder builder = KintoneClientBuilder.create("https://" + task.getDomain());
if (task.getGuestSpaceId().isPresent()) {
- builder.setGuestSpaceId(task.getGuestSpaceId().orElse(-1));
+ builder.setGuestSpaceId(task.getGuestSpaceId().get());
}
if (task.getBasicAuthUsername().isPresent() && task.getBasicAuthPassword().isPresent()) {
builder.withBasicAuth(task.getBasicAuthUsername().get(), task.getBasicAuthPassword().get());
}
-
if (task.getUsername().isPresent() && task.getPassword().isPresent()) {
- this.client =
- builder.authByPassword(task.getUsername().get(), task.getPassword().get()).build();
+ builder.authByPassword(task.getUsername().get(), task.getPassword().get());
} else if (task.getToken().isPresent()) {
- this.client = builder.authByApiToken(task.getToken().get()).build();
+ builder.authByApiToken(task.getToken().get());
+ } else {
+ throw new ConfigException("Username and password or token must be configured.");
}
+ client = builder.build();
+ formFields = client.app().getFormFields(task.getAppId());
}
- private void update(ArrayList<RecordForUpdate> records) {
- execute(
- client -> {
- client.record().updateRecords(task.getAppId(), records);
- });
+ private void insert(List<Record> records) {
+ executeWithRetry(() -> client.record().addRecords(task.getAppId(), records));
}
- private void insert(ArrayList<Record> records) {
- execute(
- client -> {
- client.record().addRecords(task.getAppId(), records);
- });
+ private void update(List<RecordForUpdate> records) {
+ executeWithRetry(() -> client.record().updateRecords(task.getAppId(), records));
}
- private void execute(Consumer<KintoneClient> operation) {
- connect(task);
- if (this.client == null) {
- throw new RuntimeException("Failed to connect to kintone.");
- }
+ private <T> T executeWithRetry(Supplier<T> operation) {
+ connectIfNeeded();
KintoneRetryOption retryOption = task.getRetryOptions();
try {
- retryExecutor()
+ return retryExecutor()
.withRetryLimit(retryOption.getLimit())
.withInitialRetryWait(retryOption.getInitialWaitMillis())
.withMaxRetryWait(retryOption.getMaxWaitMillis())
.runInterruptible(
- new Retryable<Void>() {
-
+ new Retryable<T>() {
@Override
- public Void call() throws Exception {
- operation.accept(client);
- return null;
+ public T call() throws Exception {
+ return operation.get();
}
@Override
- public boolean isRetryableException(Exception e) {
- if (!(e instanceof KintoneApiRuntimeException)) {
+ public boolean isRetryableException(Exception exception) {
+ if (!(exception instanceof KintoneApiRuntimeException)) {
return false;
}
-
try {
ObjectMapper mapper = new ObjectMapper();
JsonNode content =
- mapper.readTree(((KintoneApiRuntimeException) e).getContent());
+ mapper.readTree(((KintoneApiRuntimeException) exception).getContent());
String code = content.get("code").textValue();
return RETRYABLE_ERROR_CODES.contains(code);
- } catch (IOException ex) {
- throw new RuntimeException(ex);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
}
@Override
public void onRetry(
@@ -190,193 +196,199 @@
} catch (RetryGiveupException | InterruptedException e) {
throw new RuntimeException("kintone throw exception", e);
}
}
- private void insertPage(final Page page) {
-
- ArrayList<Record> records = new ArrayList<>();
- pageReader.setPage(page);
- KintoneColumnVisitor visitor = new KintoneColumnVisitor(pageReader, task.getColumnOptions());
- while (pageReader.nextRecord()) {
+ private void insertPage(Page page) {
+ List<Record> records = new ArrayList<>();
+ reader.setPage(page);
+ KintoneColumnVisitor visitor =
+ new KintoneColumnVisitor(
+ reader, task.getColumnOptions(), task.getPreferNulls(), task.getIgnoreNulls());
+ while (reader.nextRecord()) {
Record record = new Record();
visitor.setRecord(record);
- for (Column column : pageReader.getSchema().getColumns()) {
- column.visit(visitor);
- }
-
+ reader.getSchema().visitColumns(visitor);
+ putWrongTypeFields(record);
records.add(record);
if (records.size() == CHUNK_SIZE) {
insert(records);
records.clear();
}
}
- if (records.size() > 0) {
+ if (!records.isEmpty()) {
insert(records);
}
}
- private void updatePage(final Page page) {
- ArrayList<RecordForUpdate> updateRecords = new ArrayList<>();
- pageReader.setPage(page);
-
+ private void updatePage(Page page) {
+ List<RecordForUpdate> records = new ArrayList<>();
+ reader.setPage(page);
KintoneColumnVisitor visitor =
new KintoneColumnVisitor(
- pageReader,
+ reader,
task.getColumnOptions(),
+ task.getPreferNulls(),
+ task.getIgnoreNulls(),
task.getUpdateKeyName()
.orElseThrow(() -> new RuntimeException("unreachable"))); // Already validated
- while (pageReader.nextRecord()) {
+ while (reader.nextRecord()) {
Record record = new Record();
UpdateKey updateKey = new UpdateKey();
visitor.setRecord(record);
visitor.setUpdateKey(updateKey);
- for (Column column : pageReader.getSchema().getColumns()) {
- column.visit(visitor);
- }
-
- if (updateKey.getValue() == "") {
+ reader.getSchema().visitColumns(visitor);
+ putWrongTypeFields(record);
+ if (updateKey.getValue() == null || updateKey.getValue().toString().isEmpty()) {
+ LOGGER.warn("Record skipped because no update key value was specified");
continue;
}
-
- record.removeField(updateKey.getField());
- updateRecords.add(new RecordForUpdate(updateKey, record));
- if (updateRecords.size() == CHUNK_SIZE) {
- update(updateRecords);
- updateRecords.clear();
+ records.add(new RecordForUpdate(updateKey, record.removeField(updateKey.getField())));
+ if (records.size() == CHUNK_SIZE) {
+ update(records);
+ records.clear();
}
}
- if (updateRecords.size() > 0) {
- update(updateRecords);
+ if (!records.isEmpty()) {
+ update(records);
}
}
- private void upsertPage(final Page page) {
- execute(
- client -> {
- ArrayList<Record> records = new ArrayList<>();
- ArrayList<UpdateKey> updateKeys = new ArrayList<>();
- pageReader.setPage(page);
-
- KintoneColumnVisitor visitor =
- new KintoneColumnVisitor(
- pageReader,
- task.getColumnOptions(),
- task.getUpdateKeyName()
- .orElseThrow(() -> new RuntimeException("unreachable"))); // Already validated
- while (pageReader.nextRecord()) {
- Record record = new Record();
- UpdateKey updateKey = new UpdateKey();
- visitor.setRecord(record);
- visitor.setUpdateKey(updateKey);
- for (Column column : pageReader.getSchema().getColumns()) {
- column.visit(visitor);
- }
- records.add(record);
- updateKeys.add(updateKey);
-
- if (records.size() == UPSERT_BATCH_SIZE) {
- upsert(records, updateKeys);
- records.clear();
- updateKeys.clear();
- }
- }
- if (records.size() > 0) {
- upsert(records, updateKeys);
- }
- });
+ private void upsertPage(Page page) {
+ List<Record> records = new ArrayList<>();
+ List<UpdateKey> updateKeys = new ArrayList<>();
+ reader.setPage(page);
+ KintoneColumnVisitor visitor =
+ new KintoneColumnVisitor(
+ reader,
+ task.getColumnOptions(),
+ task.getPreferNulls(),
+ task.getIgnoreNulls(),
+ task.getUpdateKeyName()
+ .orElseThrow(() -> new RuntimeException("unreachable"))); // Already validated
+ while (reader.nextRecord()) {
+ Record record = new Record();
+ UpdateKey updateKey = new UpdateKey();
+ visitor.setRecord(record);
+ visitor.setUpdateKey(updateKey);
+ reader.getSchema().visitColumns(visitor);
+ putWrongTypeFields(record);
+ records.add(record);
+ updateKeys.add(updateKey);
+ if (records.size() == UPSERT_BATCH_SIZE) {
+ upsert(records, updateKeys);
+ records.clear();
+ updateKeys.clear();
+ }
+ }
+ if (!records.isEmpty()) {
+ upsert(records, updateKeys);
+ }
}
- private void upsert(ArrayList<Record> records, ArrayList<UpdateKey> updateKeys) {
+ private void upsert(List<Record> records, List<UpdateKey> updateKeys) {
if (records.size() != updateKeys.size()) {
throw new RuntimeException("records.size() != updateKeys.size()");
}
- FieldType updateKeyFieldType =
- client.app().getFormFields(task.getAppId()).get(updateKeys.get(0).getField()).getType();
- if (!Arrays.asList(FieldType.SINGLE_LINE_TEXT, FieldType.NUMBER).contains(updateKeyFieldType)) {
- throw new ConfigException("The update_key must be 'SINGLE_LINE_TEXT' or 'NUMBER'.");
- }
-
- List<Record> existingRecords = getExistingRecordsByUpdateKey(updateKeys);
- String updateField = updateKeys.get(0).getField();
- List<String> existingValues =
- existingRecords.stream()
- .map(
- (r) -> {
- switch (updateKeyFieldType) {
- case SINGLE_LINE_TEXT:
- String s = r.getSingleLineTextFieldValue(updateField);
- return s == null ? null : s.toString();
- case NUMBER:
- BigDecimal bd = r.getNumberFieldValue(updateField);
- return bd == null ? null : bd.toPlainString();
- default:
- return null;
- }
- })
- .filter(v -> v != null)
- .collect(Collectors.toList());
-
- ArrayList<Record> insertRecords = new ArrayList<>();
- ArrayList<RecordForUpdate> updateRecords = new ArrayList<>();
+ List<String> existingValues = executeWithRetry(() -> getExistingValuesByUpdateKey(updateKeys));
+ List<Record> insertRecords = new ArrayList<>();
+ List<RecordForUpdate> updateRecords = new ArrayList<>();
for (int i = 0; i < records.size(); i++) {
Record record = records.get(i);
UpdateKey updateKey = updateKeys.get(i);
-
if (existsRecord(existingValues, updateKey)) {
- record.removeField(updateKey.getField());
- updateRecords.add(new RecordForUpdate(updateKey, record));
+ updateRecords.add(new RecordForUpdate(updateKey, record.removeField(updateKey.getField())));
} else {
insertRecords.add(record);
}
-
if (insertRecords.size() == CHUNK_SIZE) {
insert(insertRecords);
insertRecords.clear();
} else if (updateRecords.size() == CHUNK_SIZE) {
update(updateRecords);
updateRecords.clear();
}
}
- if (insertRecords.size() > 0) {
+ if (!insertRecords.isEmpty()) {
insert(insertRecords);
}
- if (updateRecords.size() > 0) {
+ if (!updateRecords.isEmpty()) {
update(updateRecords);
}
}
- private List<Record> getExistingRecordsByUpdateKey(ArrayList<UpdateKey> updateKeys) {
- String fieldCode = updateKeys.get(0).getField();
+ private List<String> getExistingValuesByUpdateKey(List<UpdateKey> updateKeys) {
+ String fieldCode =
+ updateKeys.stream()
+ .map(UpdateKey::getField)
+ .filter(Objects::nonNull)
+ .findFirst()
+ .orElse(null);
+ if (fieldCode == null) {
+ return Collections.emptyList();
+ }
+ Function<Record, String> fieldValueAsString;
+ FieldType fieldType = getFieldType(fieldCode);
+ if (fieldType == FieldType.SINGLE_LINE_TEXT) {
+ fieldValueAsString = record -> record.getSingleLineTextFieldValue(fieldCode);
+ } else if (fieldType == FieldType.NUMBER) {
+ fieldValueAsString = record -> toString(record.getNumberFieldValue(fieldCode));
+ } else {
+ throw new ConfigException("The update_key must be 'SINGLE_LINE_TEXT' or 'NUMBER'.");
+ }
List<String> queryValues =
updateKeys.stream()
- .filter(k -> k.getValue() != "")
- .map(k -> "\"" + k.getValue().toString() + "\"")
+ .filter(k -> k.getValue() != null && !k.getValue().toString().isEmpty())
+ .map(k -> "\"" + k.getValue() + "\"")
.collect(Collectors.toList());
-
- List<Record> allRecords = new ArrayList<>();
if (queryValues.isEmpty()) {
- return allRecords;
+ return Collections.emptyList();
}
String cursorId =
client
.record()
.createCursor(
task.getAppId(),
Collections.singletonList(fieldCode),
fieldCode + " in (" + String.join(",", queryValues) + ")");
+ List<Record> allRecords = new ArrayList<>();
while (true) {
GetRecordsByCursorResponseBody resp = client.record().getRecordsByCursor(cursorId);
List<Record> records = resp.getRecords();
allRecords.addAll(records);
-
if (!resp.hasNext()) {
break;
}
}
- return allRecords;
+ return allRecords.stream()
+ .map(fieldValueAsString)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
}
- private boolean existsRecord(List<String> distValues, UpdateKey updateKey) {
- return distValues.stream().anyMatch(v -> v.equals(updateKey.getValue().toString()));
+ private boolean existsRecord(List<String> existingValues, UpdateKey updateKey) {
+ String value = toString(updateKey.getValue());
+ return value != null && existingValues.stream().anyMatch(v -> v.equals(value));
+ }
+
+ private void putWrongTypeFields(Record record) {
+ record.getFieldCodes(true).stream()
+ .map(
+ fieldCode ->
+ Maps.immutableEntry(
+ fieldCode, Pair.of(record.getFieldType(fieldCode), getFieldType(fieldCode))))
+ .filter(entry -> entry.getValue().getLeft() != entry.getValue().getRight())
+ .forEach(entry -> wrongTypeFields.put(entry.getKey(), entry.getValue()));
+ }
+
+ private FieldType getFieldType(String fieldCode) {
+ connectIfNeeded();
+ FieldProperty field = formFields.get(fieldCode);
+ return field == null ? null : field.getType();
+ }
+
+ private static String toString(Object value) {
+ return value == null
+ ? null
+ : value instanceof BigDecimal ? ((BigDecimal) value).toPlainString() : value.toString();
}
}