src/main/java/org/embulk/output/kintone/KintonePageOutput.java in embulk-output-kintone-0.4.1 vs src/main/java/org/embulk/output/kintone/KintonePageOutput.java in embulk-output-kintone-1.0.0

- old
+ new

@@ -2,28 +2,36 @@ import static org.embulk.spi.util.RetryExecutor.retryExecutor; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Maps; import com.kintone.client.KintoneClient; import com.kintone.client.KintoneClientBuilder; import com.kintone.client.api.record.GetRecordsByCursorResponseBody; import com.kintone.client.exception.KintoneApiRuntimeException; +import com.kintone.client.model.app.field.FieldProperty; import com.kintone.client.model.record.FieldType; import com.kintone.client.model.record.Record; import com.kintone.client.model.record.RecordForUpdate; import com.kintone.client.model.record.UpdateKey; import java.io.IOException; +import java.lang.invoke.MethodHandles; import java.math.BigDecimal; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; +import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; import org.embulk.config.ConfigException; import org.embulk.config.TaskReport; -import org.embulk.spi.Column; import org.embulk.spi.Exec; import org.embulk.spi.Page; import org.embulk.spi.PageReader; import org.embulk.spi.Schema; import org.embulk.spi.TransactionalPageOutput; @@ -31,26 +39,29 @@ import org.embulk.spi.util.RetryExecutor.Retryable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KintonePageOutput implements TransactionalPageOutput { - public static final int UPSERT_BATCH_SIZE = 10000; - public static final int CHUNK_SIZE = 100; - private static final Logger LOGGER = LoggerFactory.getLogger(KintonePageOutput.class); + private static final Logger LOGGER = + LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final List<String> RETRYABLE_ERROR_CODES = Arrays.asList( - "GAIA_TM12", // 作成できるカーソルの上限に達しているため、カーソ ルを作成できません。不要なカーソルを削除するか、しばらく経ってから再実行してください。 + "GAIA_TM12", // 作成できるカーソルの上限に達しているため、カーソルを作成できません。不要なカーソルを削除するか、しばらく経ってから再実行してください。 "GAIA_RE18", // データベースのロックに失敗したため、変更を保存できませんでした。時間をおいて再度お試しください。 "GAIA_DA02" // データベースのロックに失敗したため、変更を保存できませんでした。時間をおいて再度お試しください。 ); - private final PageReader pageReader; + private static final int UPSERT_BATCH_SIZE = 10000; + private static final int CHUNK_SIZE = 100; + private final Map<String, Pair<FieldType, FieldType>> wrongTypeFields = new TreeMap<>(); private final PluginTask task; + private final PageReader reader; private KintoneClient client; + private Map<String, FieldProperty> formFields; public KintonePageOutput(PluginTask task, Schema schema) { - this.pageReader = new PageReader(schema); this.task = task; + reader = new PageReader(schema); } @Override public void add(Page page) { KintoneMode mode = KintoneMode.getKintoneModeByValue(task.getMode()); @@ -74,15 +85,15 @@ // noop } @Override public void close() { - if (this.client == null) { - return; + if (client == null) { + return; // Not connected } try { - this.client.close(); + client.close(); } catch (Exception e) { throw new RuntimeException("kintone throw exception", e); } } @@ -91,82 +102,77 @@ // noop } @Override public TaskReport commit() { + wrongTypeFields.forEach( + (key, value) -> + LOGGER.warn( + String.format( + "Field type of %s is expected %s but actual %s", + key, value.getLeft(), value.getRight()))); return Exec.newTaskReport(); } - public interface Consumer<T> { - void accept(T t); - } - - public void connect(final PluginTask task) { + public void connectIfNeeded() { + if (client != null) { + return; // Already connected + } KintoneClientBuilder builder = KintoneClientBuilder.create("https://" + task.getDomain()); if (task.getGuestSpaceId().isPresent()) { - builder.setGuestSpaceId(task.getGuestSpaceId().orElse(-1)); + builder.setGuestSpaceId(task.getGuestSpaceId().get()); } if (task.getBasicAuthUsername().isPresent() && task.getBasicAuthPassword().isPresent()) { builder.withBasicAuth(task.getBasicAuthUsername().get(), task.getBasicAuthPassword().get()); } - if (task.getUsername().isPresent() && task.getPassword().isPresent()) { - this.client = - builder.authByPassword(task.getUsername().get(), task.getPassword().get()).build(); + builder.authByPassword(task.getUsername().get(), task.getPassword().get()); } else if (task.getToken().isPresent()) { - this.client = builder.authByApiToken(task.getToken().get()).build(); + builder.authByApiToken(task.getToken().get()); + } else { + throw new ConfigException("Username and password or token must be configured."); } + client = builder.build(); + formFields = client.app().getFormFields(task.getAppId()); } - private void update(ArrayList<RecordForUpdate> records) { - execute( - client -> { - client.record().updateRecords(task.getAppId(), records); - }); + private void insert(List<Record> records) { + executeWithRetry(() -> client.record().addRecords(task.getAppId(), records)); } - private void insert(ArrayList<Record> records) { - execute( - client -> { - client.record().addRecords(task.getAppId(), records); - }); + private void update(List<RecordForUpdate> records) { + executeWithRetry(() -> client.record().updateRecords(task.getAppId(), records)); } - private void execute(Consumer<KintoneClient> operation) { - connect(task); - if (this.client == null) { - throw new RuntimeException("Failed to connect to kintone."); - } + private <T> T executeWithRetry(Supplier<T> operation) { + connectIfNeeded(); KintoneRetryOption retryOption = task.getRetryOptions(); try { - retryExecutor() + return retryExecutor() .withRetryLimit(retryOption.getLimit()) .withInitialRetryWait(retryOption.getInitialWaitMillis()) .withMaxRetryWait(retryOption.getMaxWaitMillis()) .runInterruptible( - new Retryable<Void>() { - + new Retryable<T>() { @Override - public Void call() throws Exception { - operation.accept(client); - return null; + public T call() throws Exception { + return operation.get(); } @Override - public boolean isRetryableException(Exception e) { - if (!(e instanceof KintoneApiRuntimeException)) { + public boolean isRetryableException(Exception exception) { + if (!(exception instanceof KintoneApiRuntimeException)) { return false; } - try { ObjectMapper mapper = new ObjectMapper(); JsonNode content = - mapper.readTree(((KintoneApiRuntimeException) e).getContent()); + mapper.readTree(((KintoneApiRuntimeException) exception).getContent()); String code = content.get("code").textValue(); return RETRYABLE_ERROR_CODES.contains(code); - } catch (IOException ex) { - throw new RuntimeException(ex); + } catch (IOException e) { + throw new RuntimeException(e); } } @Override public void onRetry( @@ -190,193 +196,199 @@ } catch (RetryGiveupException | InterruptedException e) { throw new RuntimeException("kintone throw exception", e); } } - private void insertPage(final Page page) { - - ArrayList<Record> records = new ArrayList<>(); - pageReader.setPage(page); - KintoneColumnVisitor visitor = new KintoneColumnVisitor(pageReader, task.getColumnOptions()); - while (pageReader.nextRecord()) { + private void insertPage(Page page) { + List<Record> records = new ArrayList<>(); + reader.setPage(page); + KintoneColumnVisitor visitor = + new KintoneColumnVisitor( + reader, task.getColumnOptions(), task.getPreferNulls(), task.getIgnoreNulls()); + while (reader.nextRecord()) { Record record = new Record(); visitor.setRecord(record); - for (Column column : pageReader.getSchema().getColumns()) { - column.visit(visitor); - } - + reader.getSchema().visitColumns(visitor); + putWrongTypeFields(record); records.add(record); if (records.size() == CHUNK_SIZE) { insert(records); records.clear(); } } - if (records.size() > 0) { + if (!records.isEmpty()) { insert(records); } } - private void updatePage(final Page page) { - ArrayList<RecordForUpdate> updateRecords = new ArrayList<>(); - pageReader.setPage(page); - + private void updatePage(Page page) { + List<RecordForUpdate> records = new ArrayList<>(); + reader.setPage(page); KintoneColumnVisitor visitor = new KintoneColumnVisitor( - pageReader, + reader, task.getColumnOptions(), + task.getPreferNulls(), + task.getIgnoreNulls(), task.getUpdateKeyName() .orElseThrow(() -> new RuntimeException("unreachable"))); // Already validated - while (pageReader.nextRecord()) { + while (reader.nextRecord()) { Record record = new Record(); UpdateKey updateKey = new UpdateKey(); visitor.setRecord(record); visitor.setUpdateKey(updateKey); - for (Column column : pageReader.getSchema().getColumns()) { - column.visit(visitor); - } - - if (updateKey.getValue() == "") { + reader.getSchema().visitColumns(visitor); + putWrongTypeFields(record); + if (updateKey.getValue() == null || updateKey.getValue().toString().isEmpty()) { + LOGGER.warn("Record skipped because no update key value was specified"); continue; } - - record.removeField(updateKey.getField()); - updateRecords.add(new RecordForUpdate(updateKey, record)); - if (updateRecords.size() == CHUNK_SIZE) { - update(updateRecords); - updateRecords.clear(); + records.add(new RecordForUpdate(updateKey, record.removeField(updateKey.getField()))); + if (records.size() == CHUNK_SIZE) { + update(records); + records.clear(); } } - if (updateRecords.size() > 0) { - update(updateRecords); + if (!records.isEmpty()) { + update(records); } } - private void upsertPage(final Page page) { - execute( - client -> { - ArrayList<Record> records = new ArrayList<>(); - ArrayList<UpdateKey> updateKeys = new ArrayList<>(); - pageReader.setPage(page); - - KintoneColumnVisitor visitor = - new KintoneColumnVisitor( - pageReader, - task.getColumnOptions(), - task.getUpdateKeyName() - .orElseThrow(() -> new RuntimeException("unreachable"))); // Already validated - while (pageReader.nextRecord()) { - Record record = new Record(); - UpdateKey updateKey = new UpdateKey(); - visitor.setRecord(record); - visitor.setUpdateKey(updateKey); - for (Column column : pageReader.getSchema().getColumns()) { - column.visit(visitor); - } - records.add(record); - updateKeys.add(updateKey); - - if (records.size() == UPSERT_BATCH_SIZE) { - upsert(records, updateKeys); - records.clear(); - updateKeys.clear(); - } - } - if (records.size() > 0) { - upsert(records, updateKeys); - } - }); + private void upsertPage(Page page) { + List<Record> records = new ArrayList<>(); + List<UpdateKey> updateKeys = new ArrayList<>(); + reader.setPage(page); + KintoneColumnVisitor visitor = + new KintoneColumnVisitor( + reader, + task.getColumnOptions(), + task.getPreferNulls(), + task.getIgnoreNulls(), + task.getUpdateKeyName() + .orElseThrow(() -> new RuntimeException("unreachable"))); // Already validated + while (reader.nextRecord()) { + Record record = new Record(); + UpdateKey updateKey = new UpdateKey(); + visitor.setRecord(record); + visitor.setUpdateKey(updateKey); + reader.getSchema().visitColumns(visitor); + putWrongTypeFields(record); + records.add(record); + updateKeys.add(updateKey); + if (records.size() == UPSERT_BATCH_SIZE) { + upsert(records, updateKeys); + records.clear(); + updateKeys.clear(); + } + } + if (!records.isEmpty()) { + upsert(records, updateKeys); + } } - private void upsert(ArrayList<Record> records, ArrayList<UpdateKey> updateKeys) { + private void upsert(List<Record> records, List<UpdateKey> updateKeys) { if (records.size() != updateKeys.size()) { throw new RuntimeException("records.size() != updateKeys.size()"); } - FieldType updateKeyFieldType = - client.app().getFormFields(task.getAppId()).get(updateKeys.get(0).getField()).getType(); - if (!Arrays.asList(FieldType.SINGLE_LINE_TEXT, FieldType.NUMBER).contains(updateKeyFieldType)) { - throw new ConfigException("The update_key must be 'SINGLE_LINE_TEXT' or 'NUMBER'."); - } - - List<Record> existingRecords = getExistingRecordsByUpdateKey(updateKeys); - String updateField = updateKeys.get(0).getField(); - List<String> existingValues = - existingRecords.stream() - .map( - (r) -> { - switch (updateKeyFieldType) { - case SINGLE_LINE_TEXT: - String s = r.getSingleLineTextFieldValue(updateField); - return s == null ? null : s.toString(); - case NUMBER: - BigDecimal bd = r.getNumberFieldValue(updateField); - return bd == null ? null : bd.toPlainString(); - default: - return null; - } - }) - .filter(v -> v != null) - .collect(Collectors.toList()); - - ArrayList<Record> insertRecords = new ArrayList<>(); - ArrayList<RecordForUpdate> updateRecords = new ArrayList<>(); + List<String> existingValues = executeWithRetry(() -> getExistingValuesByUpdateKey(updateKeys)); + List<Record> insertRecords = new ArrayList<>(); + List<RecordForUpdate> updateRecords = new ArrayList<>(); for (int i = 0; i < records.size(); i++) { Record record = records.get(i); UpdateKey updateKey = updateKeys.get(i); - if (existsRecord(existingValues, updateKey)) { - record.removeField(updateKey.getField()); - updateRecords.add(new RecordForUpdate(updateKey, record)); + updateRecords.add(new RecordForUpdate(updateKey, record.removeField(updateKey.getField()))); } else { insertRecords.add(record); } - if (insertRecords.size() == CHUNK_SIZE) { insert(insertRecords); insertRecords.clear(); } else if (updateRecords.size() == CHUNK_SIZE) { update(updateRecords); updateRecords.clear(); } } - if (insertRecords.size() > 0) { + if (!insertRecords.isEmpty()) { insert(insertRecords); } - if (updateRecords.size() > 0) { + if (!updateRecords.isEmpty()) { update(updateRecords); } } - private List<Record> getExistingRecordsByUpdateKey(ArrayList<UpdateKey> updateKeys) { - String fieldCode = updateKeys.get(0).getField(); + private List<String> getExistingValuesByUpdateKey(List<UpdateKey> updateKeys) { + String fieldCode = + updateKeys.stream() + .map(UpdateKey::getField) + .filter(Objects::nonNull) + .findFirst() + .orElse(null); + if (fieldCode == null) { + return Collections.emptyList(); + } + Function<Record, String> fieldValueAsString; + FieldType fieldType = getFieldType(fieldCode); + if (fieldType == FieldType.SINGLE_LINE_TEXT) { + fieldValueAsString = record -> record.getSingleLineTextFieldValue(fieldCode); + } else if (fieldType == FieldType.NUMBER) { + fieldValueAsString = record -> toString(record.getNumberFieldValue(fieldCode)); + } else { + throw new ConfigException("The update_key must be 'SINGLE_LINE_TEXT' or 'NUMBER'."); + } List<String> queryValues = updateKeys.stream() - .filter(k -> k.getValue() != "") - .map(k -> "\"" + k.getValue().toString() + "\"") + .filter(k -> k.getValue() != null && !k.getValue().toString().isEmpty()) + .map(k -> "\"" + k.getValue() + "\"") .collect(Collectors.toList()); - - List<Record> allRecords = new ArrayList<>(); if (queryValues.isEmpty()) { - return allRecords; + return Collections.emptyList(); } String cursorId = client .record() .createCursor( task.getAppId(), Collections.singletonList(fieldCode), fieldCode + " in (" + String.join(",", queryValues) + ")"); + List<Record> allRecords = new ArrayList<>(); while (true) { GetRecordsByCursorResponseBody resp = client.record().getRecordsByCursor(cursorId); List<Record> records = resp.getRecords(); allRecords.addAll(records); - if (!resp.hasNext()) { break; } } - return allRecords; + return allRecords.stream() + .map(fieldValueAsString) + .filter(Objects::nonNull) + .collect(Collectors.toList()); } - private boolean existsRecord(List<String> distValues, UpdateKey updateKey) { - return distValues.stream().anyMatch(v -> v.equals(updateKey.getValue().toString())); + private boolean existsRecord(List<String> existingValues, UpdateKey updateKey) { + String value = toString(updateKey.getValue()); + return value != null && existingValues.stream().anyMatch(v -> v.equals(value)); + } + + private void putWrongTypeFields(Record record) { + record.getFieldCodes(true).stream() + .map( + fieldCode -> + Maps.immutableEntry( + fieldCode, Pair.of(record.getFieldType(fieldCode), getFieldType(fieldCode)))) + .filter(entry -> entry.getValue().getLeft() != entry.getValue().getRight()) + .forEach(entry -> wrongTypeFields.put(entry.getKey(), entry.getValue())); + } + + private FieldType getFieldType(String fieldCode) { + connectIfNeeded(); + FieldProperty field = formFields.get(fieldCode); + return field == null ? null : field.getType(); + } + + private static String toString(Object value) { + return value == null + ? null + : value instanceof BigDecimal ? ((BigDecimal) value).toPlainString() : value.toString(); } }