package org.embulk.output.kintone; import static org.embulk.spi.util.RetryExecutor.retryExecutor; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.kintone.client.KintoneClient; import com.kintone.client.KintoneClientBuilder; import com.kintone.client.api.record.GetRecordsByCursorResponseBody; import com.kintone.client.exception.KintoneApiRuntimeException; import com.kintone.client.model.record.FieldType; import com.kintone.client.model.record.Record; import com.kintone.client.model.record.RecordForUpdate; import com.kintone.client.model.record.UpdateKey; import java.io.IOException; import java.math.BigDecimal; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; import org.embulk.config.ConfigException; import org.embulk.config.TaskReport; import org.embulk.spi.Column; import org.embulk.spi.Exec; import org.embulk.spi.Page; import org.embulk.spi.PageReader; import org.embulk.spi.Schema; import org.embulk.spi.TransactionalPageOutput; import org.embulk.spi.util.RetryExecutor.RetryGiveupException; import org.embulk.spi.util.RetryExecutor.Retryable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KintonePageOutput implements TransactionalPageOutput { public static final int UPSERT_BATCH_SIZE = 10000; public static final int CHUNK_SIZE = 100; private static final Logger LOGGER = LoggerFactory.getLogger(KintonePageOutput.class); private static final List RETRYABLE_ERROR_CODES = Arrays.asList( "GAIA_TM12", // 作成できるカーソルの上限に達しているため、カーソ ルを作成できません。不要なカーソルを削除するか、しばらく経ってから再実行してください。 "GAIA_RE18", // データベースのロックに失敗したため、変更を保存できませんでした。時間をおいて再度お試しください。 "GAIA_DA02" // データベースのロックに失敗したため、変更を保存できませんでした。時間をおいて再度お試しください。 ); private final PageReader pageReader; private final PluginTask task; private KintoneClient client; public KintonePageOutput(PluginTask task, Schema schema) { this.pageReader = new PageReader(schema); this.task = task; } @Override public void add(Page page) { KintoneMode mode = KintoneMode.getKintoneModeByValue(task.getMode()); switch (mode) { case INSERT: insertPage(page); break; case UPDATE: updatePage(page); break; case UPSERT: upsertPage(page); break; default: throw new UnsupportedOperationException(String.format("Unknown mode '%s'", task.getMode())); } } @Override public void finish() { // noop } @Override public void close() { if (this.client == null) { return; } try { this.client.close(); } catch (Exception e) { throw new RuntimeException("kintone throw exception", e); } } @Override public void abort() { // noop } @Override public TaskReport commit() { return Exec.newTaskReport(); } public interface Consumer { void accept(T t); } public void connect(final PluginTask task) { KintoneClientBuilder builder = KintoneClientBuilder.create("https://" + task.getDomain()); if (task.getGuestSpaceId().isPresent()) { builder.setGuestSpaceId(task.getGuestSpaceId().orElse(-1)); } if (task.getBasicAuthUsername().isPresent() && task.getBasicAuthPassword().isPresent()) { builder.withBasicAuth(task.getBasicAuthUsername().get(), task.getBasicAuthPassword().get()); } if (task.getUsername().isPresent() && task.getPassword().isPresent()) { this.client = builder.authByPassword(task.getUsername().get(), task.getPassword().get()).build(); } else if (task.getToken().isPresent()) { this.client = builder.authByApiToken(task.getToken().get()).build(); } } private void update(ArrayList records) { execute( client -> { client.record().updateRecords(task.getAppId(), records); }); } private void insert(ArrayList records) { execute( client -> { client.record().addRecords(task.getAppId(), records); }); } private void execute(Consumer operation) { connect(task); if (this.client == null) { throw new RuntimeException("Failed to connect to kintone."); } KintoneRetryOption retryOption = task.getRetryOptions(); try { retryExecutor() .withRetryLimit(retryOption.getLimit()) .withInitialRetryWait(retryOption.getInitialWaitMillis()) .withMaxRetryWait(retryOption.getMaxWaitMillis()) .runInterruptible( new Retryable() { @Override public Void call() throws Exception { operation.accept(client); return null; } @Override public boolean isRetryableException(Exception e) { if (!(e instanceof KintoneApiRuntimeException)) { return false; } try { ObjectMapper mapper = new ObjectMapper(); JsonNode content = mapper.readTree(((KintoneApiRuntimeException) e).getContent()); String code = content.get("code").textValue(); return RETRYABLE_ERROR_CODES.contains(code); } catch (IOException ex) { throw new RuntimeException(ex); } } @Override public void onRetry( Exception exception, int retryCount, int retryLimit, int retryWait) throws RetryGiveupException { String message = String.format( "Retrying %d/%d after %d seconds. Message: %s", retryCount, retryLimit, retryWait / 1000, exception.getMessage()); if (retryCount % 3 == 0) { LOGGER.warn(message, exception); } else { LOGGER.warn(message); } } @Override public void onGiveup(Exception firstException, Exception lastException) throws RetryGiveupException {} }); } catch (RetryGiveupException | InterruptedException e) { throw new RuntimeException("kintone throw exception", e); } } private void insertPage(final Page page) { ArrayList records = new ArrayList<>(); pageReader.setPage(page); KintoneColumnVisitor visitor = new KintoneColumnVisitor(pageReader, task.getColumnOptions()); while (pageReader.nextRecord()) { Record record = new Record(); visitor.setRecord(record); for (Column column : pageReader.getSchema().getColumns()) { column.visit(visitor); } records.add(record); if (records.size() == CHUNK_SIZE) { insert(records); records.clear(); } } if (records.size() > 0) { insert(records); } } private void updatePage(final Page page) { ArrayList updateRecords = new ArrayList<>(); pageReader.setPage(page); KintoneColumnVisitor visitor = new KintoneColumnVisitor( pageReader, task.getColumnOptions(), task.getUpdateKeyName() .orElseThrow(() -> new RuntimeException("unreachable"))); // Already validated while (pageReader.nextRecord()) { Record record = new Record(); UpdateKey updateKey = new UpdateKey(); visitor.setRecord(record); visitor.setUpdateKey(updateKey); for (Column column : pageReader.getSchema().getColumns()) { column.visit(visitor); } if (updateKey.getValue() == "") { continue; } record.removeField(updateKey.getField()); updateRecords.add(new RecordForUpdate(updateKey, record)); if (updateRecords.size() == CHUNK_SIZE) { update(updateRecords); updateRecords.clear(); } } if (updateRecords.size() > 0) { update(updateRecords); } } private void upsertPage(final Page page) { execute( client -> { ArrayList records = new ArrayList<>(); ArrayList updateKeys = new ArrayList<>(); pageReader.setPage(page); KintoneColumnVisitor visitor = new KintoneColumnVisitor( pageReader, task.getColumnOptions(), task.getUpdateKeyName() .orElseThrow(() -> new RuntimeException("unreachable"))); // Already validated while (pageReader.nextRecord()) { Record record = new Record(); UpdateKey updateKey = new UpdateKey(); visitor.setRecord(record); visitor.setUpdateKey(updateKey); for (Column column : pageReader.getSchema().getColumns()) { column.visit(visitor); } records.add(record); updateKeys.add(updateKey); if (records.size() == UPSERT_BATCH_SIZE) { upsert(records, updateKeys); records.clear(); updateKeys.clear(); } } if (records.size() > 0) { upsert(records, updateKeys); } }); } private void upsert(ArrayList records, ArrayList updateKeys) { if (records.size() != updateKeys.size()) { throw new RuntimeException("records.size() != updateKeys.size()"); } FieldType updateKeyFieldType = client.app().getFormFields(task.getAppId()).get(updateKeys.get(0).getField()).getType(); if (!Arrays.asList(FieldType.SINGLE_LINE_TEXT, FieldType.NUMBER).contains(updateKeyFieldType)) { throw new ConfigException("The update_key must be 'SINGLE_LINE_TEXT' or 'NUMBER'."); } List existingRecords = getExistingRecordsByUpdateKey(updateKeys); String updateField = updateKeys.get(0).getField(); List existingValues = existingRecords.stream() .map( (r) -> { switch (updateKeyFieldType) { case SINGLE_LINE_TEXT: String s = r.getSingleLineTextFieldValue(updateField); return s == null ? null : s.toString(); case NUMBER: BigDecimal bd = r.getNumberFieldValue(updateField); return bd == null ? null : bd.toPlainString(); default: return null; } }) .filter(v -> v != null) .collect(Collectors.toList()); ArrayList insertRecords = new ArrayList<>(); ArrayList updateRecords = new ArrayList<>(); for (int i = 0; i < records.size(); i++) { Record record = records.get(i); UpdateKey updateKey = updateKeys.get(i); if (existsRecord(existingValues, updateKey)) { record.removeField(updateKey.getField()); updateRecords.add(new RecordForUpdate(updateKey, record)); } else { insertRecords.add(record); } if (insertRecords.size() == CHUNK_SIZE) { insert(insertRecords); insertRecords.clear(); } else if (updateRecords.size() == CHUNK_SIZE) { update(updateRecords); updateRecords.clear(); } } if (insertRecords.size() > 0) { insert(insertRecords); } if (updateRecords.size() > 0) { update(updateRecords); } } private List getExistingRecordsByUpdateKey(ArrayList updateKeys) { String fieldCode = updateKeys.get(0).getField(); List queryValues = updateKeys.stream() .filter(k -> k.getValue() != "") .map(k -> "\"" + k.getValue().toString() + "\"") .collect(Collectors.toList()); List allRecords = new ArrayList<>(); if (queryValues.isEmpty()) { return allRecords; } String cursorId = client .record() .createCursor( task.getAppId(), Collections.singletonList(fieldCode), fieldCode + " in (" + String.join(",", queryValues) + ")"); while (true) { GetRecordsByCursorResponseBody resp = client.record().getRecordsByCursor(cursorId); List records = resp.getRecords(); allRecords.addAll(records); if (!resp.hasNext()) { break; } } return allRecords; } private boolean existsRecord(List distValues, UpdateKey updateKey) { return distValues.stream().anyMatch(v -> v.equals(updateKey.getValue().toString())); } }