src/main/java/org/embulk/output/kintone/KintonePageOutput.java in embulk-output-kintone-0.4.0 vs src/main/java/org/embulk/output/kintone/KintonePageOutput.java in embulk-output-kintone-0.4.1
- old
+ new
@@ -1,36 +1,49 @@
package org.embulk.output.kintone;
+import static org.embulk.spi.util.RetryExecutor.retryExecutor;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.kintone.client.KintoneClient;
import com.kintone.client.KintoneClientBuilder;
import com.kintone.client.api.record.GetRecordsByCursorResponseBody;
+import com.kintone.client.exception.KintoneApiRuntimeException;
import com.kintone.client.model.record.FieldType;
import com.kintone.client.model.record.Record;
import com.kintone.client.model.record.RecordForUpdate;
import com.kintone.client.model.record.UpdateKey;
+import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.embulk.config.ConfigException;
import org.embulk.config.TaskReport;
import org.embulk.spi.Column;
import org.embulk.spi.Exec;
import org.embulk.spi.Page;
import org.embulk.spi.PageReader;
import org.embulk.spi.Schema;
import org.embulk.spi.TransactionalPageOutput;
+import org.embulk.spi.util.RetryExecutor.RetryGiveupException;
+import org.embulk.spi.util.RetryExecutor.Retryable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KintonePageOutput implements TransactionalPageOutput {
public static final int UPSERT_BATCH_SIZE = 10000;
public static final int CHUNK_SIZE = 100;
private static final Logger LOGGER = LoggerFactory.getLogger(KintonePageOutput.class);
+ private static final List<String> RETRYABLE_ERROR_CODES =
+ Arrays.asList(
+ "GAIA_TM12", // 作成できるカーソルの上限に達しているため、カーソ ルを作成できません。不要なカーソルを削除するか、しばらく経ってから再実行してください。
+ "GAIA_RE18", // データベースのロックに失敗したため、変更を保存できませんでした。時間をおいて再度お試しください。
+ "GAIA_DA02" // データベースのロックに失敗したため、変更を保存できませんでした。時間をおいて再度お試しください。
+ );
private final PageReader pageReader;
private final PluginTask task;
private KintoneClient client;
public KintonePageOutput(PluginTask task, Schema schema) {
@@ -102,137 +115,180 @@
} else if (task.getToken().isPresent()) {
this.client = builder.authByApiToken(task.getToken().get()).build();
}
}
+ private void update(ArrayList<RecordForUpdate> records) {
+ execute(
+ client -> {
+ client.record().updateRecords(task.getAppId(), records);
+ });
+ }
+
+ private void insert(ArrayList<Record> records) {
+ execute(
+ client -> {
+ client.record().addRecords(task.getAppId(), records);
+ });
+ }
+
private void execute(Consumer<KintoneClient> operation) {
connect(task);
- if (this.client != null) {
- operation.accept(this.client);
- } else {
+ if (this.client == null) {
throw new RuntimeException("Failed to connect to kintone.");
}
+ KintoneRetryOption retryOption = task.getRetryOptions();
+ try {
+ retryExecutor()
+ .withRetryLimit(retryOption.getLimit())
+ .withInitialRetryWait(retryOption.getInitialWaitMillis())
+ .withMaxRetryWait(retryOption.getMaxWaitMillis())
+ .runInterruptible(
+ new Retryable<Void>() {
+
+ @Override
+ public Void call() throws Exception {
+ operation.accept(client);
+ return null;
+ }
+
+ @Override
+ public boolean isRetryableException(Exception e) {
+ if (!(e instanceof KintoneApiRuntimeException)) {
+ return false;
+ }
+
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode content =
+ mapper.readTree(((KintoneApiRuntimeException) e).getContent());
+ String code = content.get("code").textValue();
+ return RETRYABLE_ERROR_CODES.contains(code);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public void onRetry(
+ Exception exception, int retryCount, int retryLimit, int retryWait)
+ throws RetryGiveupException {
+ String message =
+ String.format(
+ "Retrying %d/%d after %d seconds. Message: %s",
+ retryCount, retryLimit, retryWait / 1000, exception.getMessage());
+ if (retryCount % 3 == 0) {
+ LOGGER.warn(message, exception);
+ } else {
+ LOGGER.warn(message);
+ }
+ }
+
+ @Override
+ public void onGiveup(Exception firstException, Exception lastException)
+ throws RetryGiveupException {}
+ });
+ } catch (RetryGiveupException | InterruptedException e) {
+ throw new RuntimeException("kintone throw exception", e);
+ }
}
private void insertPage(final Page page) {
- execute(
- client -> {
- try {
- ArrayList<Record> records = new ArrayList<>();
- pageReader.setPage(page);
- KintoneColumnVisitor visitor =
- new KintoneColumnVisitor(pageReader, task.getColumnOptions());
- while (pageReader.nextRecord()) {
- Record record = new Record();
- visitor.setRecord(record);
- for (Column column : pageReader.getSchema().getColumns()) {
- column.visit(visitor);
- }
- records.add(record);
- if (records.size() == CHUNK_SIZE) {
- client.record().addRecords(task.getAppId(), records);
- records.clear();
- sleep();
- }
- }
- if (records.size() > 0) {
- client.record().addRecords(task.getAppId(), records);
- }
- } catch (Exception e) {
- throw new RuntimeException("kintone throw exception", e);
- }
- });
+ ArrayList<Record> records = new ArrayList<>();
+ pageReader.setPage(page);
+ KintoneColumnVisitor visitor = new KintoneColumnVisitor(pageReader, task.getColumnOptions());
+ while (pageReader.nextRecord()) {
+ Record record = new Record();
+ visitor.setRecord(record);
+ for (Column column : pageReader.getSchema().getColumns()) {
+ column.visit(visitor);
+ }
+
+ records.add(record);
+ if (records.size() == CHUNK_SIZE) {
+ insert(records);
+ records.clear();
+ }
+ }
+ if (records.size() > 0) {
+ insert(records);
+ }
}
private void updatePage(final Page page) {
- execute(
- client -> {
- try {
- ArrayList<RecordForUpdate> updateRecords = new ArrayList<>();
- pageReader.setPage(page);
+ ArrayList<RecordForUpdate> updateRecords = new ArrayList<>();
+ pageReader.setPage(page);
- KintoneColumnVisitor visitor =
- new KintoneColumnVisitor(
- pageReader,
- task.getColumnOptions(),
- task.getUpdateKeyName()
- .orElseThrow(
- () -> new RuntimeException("unreachable"))); // Already validated
- while (pageReader.nextRecord()) {
- Record record = new Record();
- UpdateKey updateKey = new UpdateKey();
- visitor.setRecord(record);
- visitor.setUpdateKey(updateKey);
- for (Column column : pageReader.getSchema().getColumns()) {
- column.visit(visitor);
- }
+ KintoneColumnVisitor visitor =
+ new KintoneColumnVisitor(
+ pageReader,
+ task.getColumnOptions(),
+ task.getUpdateKeyName()
+ .orElseThrow(() -> new RuntimeException("unreachable"))); // Already validated
+ while (pageReader.nextRecord()) {
+ Record record = new Record();
+ UpdateKey updateKey = new UpdateKey();
+ visitor.setRecord(record);
+ visitor.setUpdateKey(updateKey);
+ for (Column column : pageReader.getSchema().getColumns()) {
+ column.visit(visitor);
+ }
- if (updateKey.getValue() == "") {
- continue;
- }
+ if (updateKey.getValue() == "") {
+ continue;
+ }
- record.removeField(updateKey.getField());
- updateRecords.add(new RecordForUpdate(updateKey, record));
- if (updateRecords.size() == CHUNK_SIZE) {
- client.record().updateRecords(task.getAppId(), updateRecords);
- updateRecords.clear();
- sleep();
- }
- }
- if (updateRecords.size() > 0) {
- client.record().updateRecords(task.getAppId(), updateRecords);
- }
- } catch (Exception e) {
- throw new RuntimeException("kintone throw exception", e);
- }
- });
+ record.removeField(updateKey.getField());
+ updateRecords.add(new RecordForUpdate(updateKey, record));
+ if (updateRecords.size() == CHUNK_SIZE) {
+ update(updateRecords);
+ updateRecords.clear();
+ }
+ }
+ if (updateRecords.size() > 0) {
+ update(updateRecords);
+ }
}
private void upsertPage(final Page page) {
execute(
client -> {
- try {
- ArrayList<Record> records = new ArrayList<>();
- ArrayList<UpdateKey> updateKeys = new ArrayList<>();
- pageReader.setPage(page);
+ ArrayList<Record> records = new ArrayList<>();
+ ArrayList<UpdateKey> updateKeys = new ArrayList<>();
+ pageReader.setPage(page);
- KintoneColumnVisitor visitor =
- new KintoneColumnVisitor(
- pageReader,
- task.getColumnOptions(),
- task.getUpdateKeyName()
- .orElseThrow(
- () -> new RuntimeException("unreachable"))); // Already validated
- while (pageReader.nextRecord()) {
- Record record = new Record();
- UpdateKey updateKey = new UpdateKey();
- visitor.setRecord(record);
- visitor.setUpdateKey(updateKey);
- for (Column column : pageReader.getSchema().getColumns()) {
- column.visit(visitor);
- }
- records.add(record);
- updateKeys.add(updateKey);
-
- if (records.size() == UPSERT_BATCH_SIZE) {
- upsert(records, updateKeys);
- records.clear();
- updateKeys.clear();
- }
+ KintoneColumnVisitor visitor =
+ new KintoneColumnVisitor(
+ pageReader,
+ task.getColumnOptions(),
+ task.getUpdateKeyName()
+ .orElseThrow(() -> new RuntimeException("unreachable"))); // Already validated
+ while (pageReader.nextRecord()) {
+ Record record = new Record();
+ UpdateKey updateKey = new UpdateKey();
+ visitor.setRecord(record);
+ visitor.setUpdateKey(updateKey);
+ for (Column column : pageReader.getSchema().getColumns()) {
+ column.visit(visitor);
}
- if (records.size() > 0) {
+ records.add(record);
+ updateKeys.add(updateKey);
+
+ if (records.size() == UPSERT_BATCH_SIZE) {
upsert(records, updateKeys);
+ records.clear();
+ updateKeys.clear();
}
- } catch (Exception e) {
- throw new RuntimeException("kintone throw exception", e);
}
+ if (records.size() > 0) {
+ upsert(records, updateKeys);
+ }
});
}
- private void upsert(ArrayList<Record> records, ArrayList<UpdateKey> updateKeys)
- throws InterruptedException {
+ private void upsert(ArrayList<Record> records, ArrayList<UpdateKey> updateKeys) {
if (records.size() != updateKeys.size()) {
throw new RuntimeException("records.size() != updateKeys.size()");
}
FieldType updateKeyFieldType =
client.app().getFormFields(task.getAppId()).get(updateKeys.get(0).getField()).getType();
@@ -272,24 +328,22 @@
} else {
insertRecords.add(record);
}
if (insertRecords.size() == CHUNK_SIZE) {
- client.record().addRecords(task.getAppId(), insertRecords);
+ insert(insertRecords);
insertRecords.clear();
- sleep();
} else if (updateRecords.size() == CHUNK_SIZE) {
- client.record().updateRecords(task.getAppId(), updateRecords);
+ update(updateRecords);
updateRecords.clear();
- sleep();
}
}
if (insertRecords.size() > 0) {
- client.record().addRecords(task.getAppId(), insertRecords);
+ insert(insertRecords);
}
if (updateRecords.size() > 0) {
- client.record().updateRecords(task.getAppId(), updateRecords);
+ update(updateRecords);
}
}
private List<Record> getExistingRecordsByUpdateKey(ArrayList<UpdateKey> updateKeys) {
String fieldCode = updateKeys.get(0).getField();
@@ -322,16 +376,7 @@
return allRecords;
}
private boolean existsRecord(List<String> distValues, UpdateKey updateKey) {
return distValues.stream().anyMatch(v -> v.equals(updateKey.getValue().toString()));
- }
-
- private void sleep() throws InterruptedException {
- if (!task.getIntervalSeconds().isPresent()) {
- return;
- }
- Integer interval = task.getIntervalSeconds().get();
- LOGGER.info(String.format("sleep %d seconds.", interval));
- TimeUnit.SECONDS.sleep(interval);
}
}