src/main/java/org/embulk/output/kintone/KintonePageOutput.java in embulk-output-kintone-0.4.0 vs src/main/java/org/embulk/output/kintone/KintonePageOutput.java in embulk-output-kintone-0.4.1

- old
+ new

@@ -1,36 +1,49 @@ package org.embulk.output.kintone; +import static org.embulk.spi.util.RetryExecutor.retryExecutor; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.kintone.client.KintoneClient; import com.kintone.client.KintoneClientBuilder; import com.kintone.client.api.record.GetRecordsByCursorResponseBody; +import com.kintone.client.exception.KintoneApiRuntimeException; import com.kintone.client.model.record.FieldType; import com.kintone.client.model.record.Record; import com.kintone.client.model.record.RecordForUpdate; import com.kintone.client.model.record.UpdateKey; +import java.io.IOException; import java.math.BigDecimal; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.embulk.config.ConfigException; import org.embulk.config.TaskReport; import org.embulk.spi.Column; import org.embulk.spi.Exec; import org.embulk.spi.Page; import org.embulk.spi.PageReader; import org.embulk.spi.Schema; import org.embulk.spi.TransactionalPageOutput; +import org.embulk.spi.util.RetryExecutor.RetryGiveupException; +import org.embulk.spi.util.RetryExecutor.Retryable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KintonePageOutput implements TransactionalPageOutput { public static final int UPSERT_BATCH_SIZE = 10000; public static final int CHUNK_SIZE = 100; private static final Logger LOGGER = LoggerFactory.getLogger(KintonePageOutput.class); + private static final List<String> RETRYABLE_ERROR_CODES = + Arrays.asList( + "GAIA_TM12", // 作成できるカーソルの上限に達しているため、カーソ ルを作成できません。不要なカーソルを削除するか、しばらく経ってから再実行してください。 + "GAIA_RE18", // データベースのロックに失敗したため、変更を保存できませんでした。時間をおいて再度お試しください。 + "GAIA_DA02" // データベースのロックに失敗したため、変更を保存できませんでした。時間をおいて再度お試しください。 + ); private final PageReader pageReader; private final PluginTask task; private KintoneClient client; public KintonePageOutput(PluginTask task, Schema schema) { @@ -102,137 +115,180 @@ } else if (task.getToken().isPresent()) { this.client = builder.authByApiToken(task.getToken().get()).build(); } } + private void update(ArrayList<RecordForUpdate> records) { + execute( + client -> { + client.record().updateRecords(task.getAppId(), records); + }); + } + + private void insert(ArrayList<Record> records) { + execute( + client -> { + client.record().addRecords(task.getAppId(), records); + }); + } + private void execute(Consumer<KintoneClient> operation) { connect(task); - if (this.client != null) { - operation.accept(this.client); - } else { + if (this.client == null) { throw new RuntimeException("Failed to connect to kintone."); } + KintoneRetryOption retryOption = task.getRetryOptions(); + try { + retryExecutor() + .withRetryLimit(retryOption.getLimit()) + .withInitialRetryWait(retryOption.getInitialWaitMillis()) + .withMaxRetryWait(retryOption.getMaxWaitMillis()) + .runInterruptible( + new Retryable<Void>() { + + @Override + public Void call() throws Exception { + operation.accept(client); + return null; + } + + @Override + public boolean isRetryableException(Exception e) { + if (!(e instanceof KintoneApiRuntimeException)) { + return false; + } + + try { + ObjectMapper mapper = new ObjectMapper(); + JsonNode content = + mapper.readTree(((KintoneApiRuntimeException) e).getContent()); + String code = content.get("code").textValue(); + return RETRYABLE_ERROR_CODES.contains(code); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + + @Override + public void onRetry( + Exception exception, int retryCount, int retryLimit, int retryWait) + throws RetryGiveupException { + String message = + String.format( + "Retrying %d/%d after %d seconds. Message: %s", + retryCount, retryLimit, retryWait / 1000, exception.getMessage()); + if (retryCount % 3 == 0) { + LOGGER.warn(message, exception); + } else { + LOGGER.warn(message); + } + } + + @Override + public void onGiveup(Exception firstException, Exception lastException) + throws RetryGiveupException {} + }); + } catch (RetryGiveupException | InterruptedException e) { + throw new RuntimeException("kintone throw exception", e); + } } private void insertPage(final Page page) { - execute( - client -> { - try { - ArrayList<Record> records = new ArrayList<>(); - pageReader.setPage(page); - KintoneColumnVisitor visitor = - new KintoneColumnVisitor(pageReader, task.getColumnOptions()); - while (pageReader.nextRecord()) { - Record record = new Record(); - visitor.setRecord(record); - for (Column column : pageReader.getSchema().getColumns()) { - column.visit(visitor); - } - records.add(record); - if (records.size() == CHUNK_SIZE) { - client.record().addRecords(task.getAppId(), records); - records.clear(); - sleep(); - } - } - if (records.size() > 0) { - client.record().addRecords(task.getAppId(), records); - } - } catch (Exception e) { - throw new RuntimeException("kintone throw exception", e); - } - }); + ArrayList<Record> records = new ArrayList<>(); + pageReader.setPage(page); + KintoneColumnVisitor visitor = new KintoneColumnVisitor(pageReader, task.getColumnOptions()); + while (pageReader.nextRecord()) { + Record record = new Record(); + visitor.setRecord(record); + for (Column column : pageReader.getSchema().getColumns()) { + column.visit(visitor); + } + + records.add(record); + if (records.size() == CHUNK_SIZE) { + insert(records); + records.clear(); + } + } + if (records.size() > 0) { + insert(records); + } } private void updatePage(final Page page) { - execute( - client -> { - try { - ArrayList<RecordForUpdate> updateRecords = new ArrayList<>(); - pageReader.setPage(page); + ArrayList<RecordForUpdate> updateRecords = new ArrayList<>(); + pageReader.setPage(page); - KintoneColumnVisitor visitor = - new KintoneColumnVisitor( - pageReader, - task.getColumnOptions(), - task.getUpdateKeyName() - .orElseThrow( - () -> new RuntimeException("unreachable"))); // Already validated - while (pageReader.nextRecord()) { - Record record = new Record(); - UpdateKey updateKey = new UpdateKey(); - visitor.setRecord(record); - visitor.setUpdateKey(updateKey); - for (Column column : pageReader.getSchema().getColumns()) { - column.visit(visitor); - } + KintoneColumnVisitor visitor = + new KintoneColumnVisitor( + pageReader, + task.getColumnOptions(), + task.getUpdateKeyName() + .orElseThrow(() -> new RuntimeException("unreachable"))); // Already validated + while (pageReader.nextRecord()) { + Record record = new Record(); + UpdateKey updateKey = new UpdateKey(); + visitor.setRecord(record); + visitor.setUpdateKey(updateKey); + for (Column column : pageReader.getSchema().getColumns()) { + column.visit(visitor); + } - if (updateKey.getValue() == "") { - continue; - } + if (updateKey.getValue() == "") { + continue; + } - record.removeField(updateKey.getField()); - updateRecords.add(new RecordForUpdate(updateKey, record)); - if (updateRecords.size() == CHUNK_SIZE) { - client.record().updateRecords(task.getAppId(), updateRecords); - updateRecords.clear(); - sleep(); - } - } - if (updateRecords.size() > 0) { - client.record().updateRecords(task.getAppId(), updateRecords); - } - } catch (Exception e) { - throw new RuntimeException("kintone throw exception", e); - } - }); + record.removeField(updateKey.getField()); + updateRecords.add(new RecordForUpdate(updateKey, record)); + if (updateRecords.size() == CHUNK_SIZE) { + update(updateRecords); + updateRecords.clear(); + } + } + if (updateRecords.size() > 0) { + update(updateRecords); + } } private void upsertPage(final Page page) { execute( client -> { - try { - ArrayList<Record> records = new ArrayList<>(); - ArrayList<UpdateKey> updateKeys = new ArrayList<>(); - pageReader.setPage(page); + ArrayList<Record> records = new ArrayList<>(); + ArrayList<UpdateKey> updateKeys = new ArrayList<>(); + pageReader.setPage(page); - KintoneColumnVisitor visitor = - new KintoneColumnVisitor( - pageReader, - task.getColumnOptions(), - task.getUpdateKeyName() - .orElseThrow( - () -> new RuntimeException("unreachable"))); // Already validated - while (pageReader.nextRecord()) { - Record record = new Record(); - UpdateKey updateKey = new UpdateKey(); - visitor.setRecord(record); - visitor.setUpdateKey(updateKey); - for (Column column : pageReader.getSchema().getColumns()) { - column.visit(visitor); - } - records.add(record); - updateKeys.add(updateKey); - - if (records.size() == UPSERT_BATCH_SIZE) { - upsert(records, updateKeys); - records.clear(); - updateKeys.clear(); - } + KintoneColumnVisitor visitor = + new KintoneColumnVisitor( + pageReader, + task.getColumnOptions(), + task.getUpdateKeyName() + .orElseThrow(() -> new RuntimeException("unreachable"))); // Already validated + while (pageReader.nextRecord()) { + Record record = new Record(); + UpdateKey updateKey = new UpdateKey(); + visitor.setRecord(record); + visitor.setUpdateKey(updateKey); + for (Column column : pageReader.getSchema().getColumns()) { + column.visit(visitor); } - if (records.size() > 0) { + records.add(record); + updateKeys.add(updateKey); + + if (records.size() == UPSERT_BATCH_SIZE) { upsert(records, updateKeys); + records.clear(); + updateKeys.clear(); } - } catch (Exception e) { - throw new RuntimeException("kintone throw exception", e); } + if (records.size() > 0) { + upsert(records, updateKeys); + } }); } - private void upsert(ArrayList<Record> records, ArrayList<UpdateKey> updateKeys) - throws InterruptedException { + private void upsert(ArrayList<Record> records, ArrayList<UpdateKey> updateKeys) { if (records.size() != updateKeys.size()) { throw new RuntimeException("records.size() != updateKeys.size()"); } FieldType updateKeyFieldType = client.app().getFormFields(task.getAppId()).get(updateKeys.get(0).getField()).getType(); @@ -272,24 +328,22 @@ } else { insertRecords.add(record); } if (insertRecords.size() == CHUNK_SIZE) { - client.record().addRecords(task.getAppId(), insertRecords); + insert(insertRecords); insertRecords.clear(); - sleep(); } else if (updateRecords.size() == CHUNK_SIZE) { - client.record().updateRecords(task.getAppId(), updateRecords); + update(updateRecords); updateRecords.clear(); - sleep(); } } if (insertRecords.size() > 0) { - client.record().addRecords(task.getAppId(), insertRecords); + insert(insertRecords); } if (updateRecords.size() > 0) { - client.record().updateRecords(task.getAppId(), updateRecords); + update(updateRecords); } } private List<Record> getExistingRecordsByUpdateKey(ArrayList<UpdateKey> updateKeys) { String fieldCode = updateKeys.get(0).getField(); @@ -322,16 +376,7 @@ return allRecords; } private boolean existsRecord(List<String> distValues, UpdateKey updateKey) { return distValues.stream().anyMatch(v -> v.equals(updateKey.getValue().toString())); - } - - private void sleep() throws InterruptedException { - if (!task.getIntervalSeconds().isPresent()) { - return; - } - Integer interval = task.getIntervalSeconds().get(); - LOGGER.info(String.format("sleep %d seconds.", interval)); - TimeUnit.SECONDS.sleep(interval); } }