src/main/java/org/embulk/output/kintone/KintonePageOutput.java in embulk-output-kintone-0.3.3 vs src/main/java/org/embulk/output/kintone/KintonePageOutput.java in embulk-output-kintone-0.3.6
- old
+ new
@@ -5,322 +5,315 @@
import com.kintone.client.api.record.GetRecordsByCursorResponseBody;
import com.kintone.client.model.record.FieldType;
import com.kintone.client.model.record.Record;
import com.kintone.client.model.record.RecordForUpdate;
import com.kintone.client.model.record.UpdateKey;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.embulk.config.ConfigException;
import org.embulk.config.TaskReport;
import org.embulk.spi.Column;
import org.embulk.spi.Exec;
import org.embulk.spi.Page;
import org.embulk.spi.PageReader;
import org.embulk.spi.Schema;
import org.embulk.spi.TransactionalPageOutput;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
+public class KintonePageOutput implements TransactionalPageOutput {
+ public static final int UPSERT_BATCH_SIZE = 10000;
+ public static final int CHUNK_SIZE = 100;
+ private final PageReader pageReader;
+ private final PluginTask task;
+ private KintoneClient client;
-public class KintonePageOutput
- implements TransactionalPageOutput
-{
- public static final int UPSERT_BATCH_SIZE = 10000;
- public static final int CHUNK_SIZE = 100;
- private final PageReader pageReader;
- private final PluginTask task;
- private KintoneClient client;
+ public KintonePageOutput(PluginTask task, Schema schema) {
+ this.pageReader = new PageReader(schema);
+ this.task = task;
+ }
- public KintonePageOutput(PluginTask task, Schema schema)
- {
- this.pageReader = new PageReader(schema);
- this.task = task;
+ @Override
+ public void add(Page page) {
+ KintoneMode mode = KintoneMode.getKintoneModeByValue(task.getMode());
+ switch (mode) {
+ case INSERT:
+ insertPage(page);
+ break;
+ case UPDATE:
+ updatePage(page);
+ break;
+ case UPSERT:
+ upsertPage(page);
+ break;
+ default:
+ throw new UnsupportedOperationException(String.format("Unknown mode '%s'", task.getMode()));
}
+ }
- @Override
- public void add(Page page)
- {
- KintoneMode mode = KintoneMode.getKintoneModeByValue(task.getMode());
- switch (mode) {
- case INSERT:
- insertPage(page);
- break;
- case UPDATE:
- updatePage(page);
- break;
- case UPSERT:
- upsertPage(page);
- break;
- default:
- throw new UnsupportedOperationException(String.format(
- "Unknown mode '%s'",
- task.getMode()));
- }
- }
+ @Override
+ public void finish() {
+ // noop
+ }
- @Override
- public void finish()
- {
- // noop
+ @Override
+ public void close() {
+ if (this.client == null) {
+ return;
}
-
- @Override
- public void close()
- {
- if (this.client == null) {
- return;
- }
- try {
- this.client.close();
- }
- catch (Exception e) {
- throw new RuntimeException("kintone throw exception", e);
- }
+ try {
+ this.client.close();
+ } catch (Exception e) {
+ throw new RuntimeException("kintone throw exception", e);
}
+ }
- @Override
- public void abort()
- {
- // noop
- }
+ @Override
+ public void abort() {
+ // noop
+ }
- @Override
- public TaskReport commit()
- {
- return Exec.newTaskReport();
- }
+ @Override
+ public TaskReport commit() {
+ return Exec.newTaskReport();
+ }
- public interface Consumer<T>
- {
- void accept(T t);
+ public interface Consumer<T> {
+ void accept(T t);
+ }
+
+ public void connect(final PluginTask task) {
+ KintoneClientBuilder builder = KintoneClientBuilder.create("https://" + task.getDomain());
+ if (task.getGuestSpaceId().isPresent()) {
+ builder.setGuestSpaceId(task.getGuestSpaceId().orElse(-1));
}
+ if (task.getBasicAuthUsername().isPresent() && task.getBasicAuthPassword().isPresent()) {
+ builder.withBasicAuth(task.getBasicAuthUsername().get(), task.getBasicAuthPassword().get());
+ }
- public void connect(final PluginTask task)
- {
- KintoneClientBuilder builder = KintoneClientBuilder.create("https://" + task.getDomain());
- if (task.getGuestSpaceId().isPresent()) {
- builder.setGuestSpaceId(task.getGuestSpaceId().orElse(-1));
- }
- if (task.getBasicAuthUsername().isPresent() && task.getBasicAuthPassword().isPresent()) {
- builder.withBasicAuth(task.getBasicAuthUsername().get(),
- task.getBasicAuthPassword().get());
- }
-
- if (task.getUsername().isPresent() && task.getPassword().isPresent()) {
- this.client = builder
- .authByPassword(task.getUsername().get(), task.getPassword().get())
- .build();
- }
- else if (task.getToken().isPresent()) {
- this.client = builder
- .authByApiToken(task.getToken().get())
- .build();
- }
+ if (task.getUsername().isPresent() && task.getPassword().isPresent()) {
+ this.client =
+ builder.authByPassword(task.getUsername().get(), task.getPassword().get()).build();
+ } else if (task.getToken().isPresent()) {
+ this.client = builder.authByApiToken(task.getToken().get()).build();
}
+ }
- private void execute(Consumer<KintoneClient> operation)
- {
- connect(task);
- if (this.client != null) {
- operation.accept(this.client);
- } else {
- throw new RuntimeException("Failed to connect to kintone.");
- }
+ private void execute(Consumer<KintoneClient> operation) {
+ connect(task);
+ if (this.client != null) {
+ operation.accept(this.client);
+ } else {
+ throw new RuntimeException("Failed to connect to kintone.");
}
+ }
- private void insertPage(final Page page)
- {
- execute(client -> {
- try {
- ArrayList<Record> records = new ArrayList<>();
- pageReader.setPage(page);
- KintoneColumnVisitor visitor = new KintoneColumnVisitor(pageReader,
- task.getColumnOptions());
- while (pageReader.nextRecord()) {
- Record record = new Record();
- visitor.setRecord(record);
- for (Column column : pageReader.getSchema().getColumns()) {
- column.visit(visitor);
- }
+ private void insertPage(final Page page) {
+ execute(
+ client -> {
+ try {
+ ArrayList<Record> records = new ArrayList<>();
+ pageReader.setPage(page);
+ KintoneColumnVisitor visitor =
+ new KintoneColumnVisitor(pageReader, task.getColumnOptions());
+ while (pageReader.nextRecord()) {
+ Record record = new Record();
+ visitor.setRecord(record);
+ for (Column column : pageReader.getSchema().getColumns()) {
+ column.visit(visitor);
+ }
- records.add(record);
- if (records.size() == CHUNK_SIZE) {
- client.record().addRecords(task.getAppId(), records);
- records.clear();
- }
- }
- if (records.size() > 0) {
- client.record().addRecords(task.getAppId(), records);
- }
+ records.add(record);
+ if (records.size() == CHUNK_SIZE) {
+ client.record().addRecords(task.getAppId(), records);
+ records.clear();
+ }
}
- catch (Exception e) {
- throw new RuntimeException("kintone throw exception", e);
+ if (records.size() > 0) {
+ client.record().addRecords(task.getAppId(), records);
}
+ } catch (Exception e) {
+ throw new RuntimeException("kintone throw exception", e);
+ }
});
- }
+ }
- private void updatePage(final Page page)
- {
- execute(client -> {
- try {
- ArrayList<RecordForUpdate> updateRecords = new ArrayList<>();
- pageReader.setPage(page);
+ private void updatePage(final Page page) {
+ execute(
+ client -> {
+ try {
+ ArrayList<RecordForUpdate> updateRecords = new ArrayList<>();
+ pageReader.setPage(page);
- KintoneColumnVisitor visitor = new KintoneColumnVisitor(pageReader,
- task.getColumnOptions(),
- task.getUpdateKeyName().orElseThrow(() -> new RuntimeException("unreachable"))); // Already validated
- while (pageReader.nextRecord()) {
- Record record = new Record();
- UpdateKey updateKey = new UpdateKey();
- visitor.setRecord(record);
- visitor.setUpdateKey(updateKey);
- for (Column column : pageReader.getSchema().getColumns()) {
- column.visit(visitor);
- }
+ KintoneColumnVisitor visitor =
+ new KintoneColumnVisitor(
+ pageReader,
+ task.getColumnOptions(),
+ task.getUpdateKeyName()
+ .orElseThrow(
+ () -> new RuntimeException("unreachable"))); // Already validated
+ while (pageReader.nextRecord()) {
+ Record record = new Record();
+ UpdateKey updateKey = new UpdateKey();
+ visitor.setRecord(record);
+ visitor.setUpdateKey(updateKey);
+ for (Column column : pageReader.getSchema().getColumns()) {
+ column.visit(visitor);
+ }
- if (updateKey.getValue() == "") {
- continue;
- }
+ if (updateKey.getValue() == "") {
+ continue;
+ }
- record.removeField(updateKey.getField());
- updateRecords.add(new RecordForUpdate(updateKey, record));
- if (updateRecords.size() == CHUNK_SIZE) {
- client.record().updateRecords(task.getAppId(), updateRecords);
- updateRecords.clear();
- }
- }
- if (updateRecords.size() > 0) {
- client.record().updateRecords(task.getAppId(), updateRecords);
- }
+ record.removeField(updateKey.getField());
+ updateRecords.add(new RecordForUpdate(updateKey, record));
+ if (updateRecords.size() == CHUNK_SIZE) {
+ client.record().updateRecords(task.getAppId(), updateRecords);
+ updateRecords.clear();
+ }
}
- catch (Exception e) {
- throw new RuntimeException("kintone throw exception", e);
+ if (updateRecords.size() > 0) {
+ client.record().updateRecords(task.getAppId(), updateRecords);
}
+ } catch (Exception e) {
+ throw new RuntimeException("kintone throw exception", e);
+ }
});
- }
+ }
- private void upsertPage(final Page page)
- {
- execute(client -> {
- try {
- ArrayList<Record> records = new ArrayList<>();
- ArrayList<UpdateKey> updateKeys = new ArrayList<>();
- pageReader.setPage(page);
+ private void upsertPage(final Page page) {
+ execute(
+ client -> {
+ try {
+ ArrayList<Record> records = new ArrayList<>();
+ ArrayList<UpdateKey> updateKeys = new ArrayList<>();
+ pageReader.setPage(page);
- KintoneColumnVisitor visitor = new KintoneColumnVisitor(pageReader,
- task.getColumnOptions(),
- task.getUpdateKeyName().orElseThrow(() -> new RuntimeException("unreachable"))); // Already validated
- while (pageReader.nextRecord()) {
- Record record = new Record();
- UpdateKey updateKey = new UpdateKey();
- visitor.setRecord(record);
- visitor.setUpdateKey(updateKey);
- for (Column column : pageReader.getSchema().getColumns()) {
- column.visit(visitor);
- }
- records.add(record);
- updateKeys.add(updateKey);
+ KintoneColumnVisitor visitor =
+ new KintoneColumnVisitor(
+ pageReader,
+ task.getColumnOptions(),
+ task.getUpdateKeyName()
+ .orElseThrow(
+ () -> new RuntimeException("unreachable"))); // Already validated
+ while (pageReader.nextRecord()) {
+ Record record = new Record();
+ UpdateKey updateKey = new UpdateKey();
+ visitor.setRecord(record);
+ visitor.setUpdateKey(updateKey);
+ for (Column column : pageReader.getSchema().getColumns()) {
+ column.visit(visitor);
+ }
+ records.add(record);
+ updateKeys.add(updateKey);
- if (records.size() == UPSERT_BATCH_SIZE) {
- upsert(records, updateKeys);
- records.clear();
- updateKeys.clear();
- }
- }
- if (records.size() > 0) {
- upsert(records, updateKeys);
- }
+ if (records.size() == UPSERT_BATCH_SIZE) {
+ upsert(records, updateKeys);
+ records.clear();
+ updateKeys.clear();
+ }
}
- catch (Exception e) {
- throw new RuntimeException("kintone throw exception", e);
+ if (records.size() > 0) {
+ upsert(records, updateKeys);
}
+ } catch (Exception e) {
+ throw new RuntimeException("kintone throw exception", e);
+ }
});
+ }
+
+ private void upsert(ArrayList<Record> records, ArrayList<UpdateKey> updateKeys) {
+ if (records.size() != updateKeys.size()) {
+ throw new RuntimeException("records.size() != updateKeys.size()");
}
+ FieldType updateKeyFieldType =
+ client.app().getFormFields(task.getAppId()).get(updateKeys.get(0).getField()).getType();
+ if (!Arrays.asList(FieldType.SINGLE_LINE_TEXT, FieldType.NUMBER).contains(updateKeyFieldType)) {
+ throw new ConfigException("The update_key must be 'SINGLE_LINE_TEXT' or 'NUMBER'.");
+ }
- private void upsert(ArrayList<Record> records, ArrayList<UpdateKey> updateKeys)
- {
- if (records.size() != updateKeys.size()) {
- throw new RuntimeException("records.size() != updateKeys.size()");
- }
+ List<Record> existingRecords = getExistingRecordsByUpdateKey(updateKeys);
+ String updateField = updateKeys.get(0).getField();
+ List<String> existingValues =
+ existingRecords.stream()
+ .map(
+ (r) -> {
+ switch (updateKeyFieldType) {
+ case SINGLE_LINE_TEXT:
+ String s = r.getSingleLineTextFieldValue(updateField);
+ return s == null ? null : s.toString();
+ case NUMBER:
+ BigDecimal bd = r.getNumberFieldValue(updateField);
+ return bd == null ? null : bd.toPlainString();
+ default:
+ return null;
+ }
+ })
+ .filter(v -> v != null)
+ .collect(Collectors.toList());
- List<Record> existingRecords = getExistingRecordsByUpdateKey(updateKeys);
+ ArrayList<Record> insertRecords = new ArrayList<>();
+ ArrayList<RecordForUpdate> updateRecords = new ArrayList<>();
+ for (int i = 0; i < records.size(); i++) {
+ Record record = records.get(i);
+ UpdateKey updateKey = updateKeys.get(i);
- ArrayList<Record> insertRecords = new ArrayList<>();
- ArrayList<RecordForUpdate> updateRecords = new ArrayList<>();
- for (int i = 0; i < records.size(); i++) {
- Record record = records.get(i);
- UpdateKey updateKey = updateKeys.get(i);
+ if (existsRecord(existingValues, updateKey)) {
+ record.removeField(updateKey.getField());
+ updateRecords.add(new RecordForUpdate(updateKey, record));
+ } else {
+ insertRecords.add(record);
+ }
- if (existsRecord(existingRecords, updateKey)) {
- record.removeField(updateKey.getField());
- updateRecords.add(new RecordForUpdate(updateKey, record));
- }
- else {
- insertRecords.add(record);
- }
-
- if (insertRecords.size() == CHUNK_SIZE) {
- client.record().addRecords(task.getAppId(), insertRecords);
- insertRecords.clear();
- }
- else if (updateRecords.size() == CHUNK_SIZE) {
- client.record().updateRecords(task.getAppId(), updateRecords);
- updateRecords.clear();
- }
- }
- if (insertRecords.size() > 0) {
- client.record().addRecords(task.getAppId(), insertRecords);
- }
- if (updateRecords.size() > 0) {
- client.record().updateRecords(task.getAppId(), updateRecords);
- }
+ if (insertRecords.size() == CHUNK_SIZE) {
+ client.record().addRecords(task.getAppId(), insertRecords);
+ insertRecords.clear();
+ } else if (updateRecords.size() == CHUNK_SIZE) {
+ client.record().updateRecords(task.getAppId(), updateRecords);
+ updateRecords.clear();
+ }
}
+ if (insertRecords.size() > 0) {
+ client.record().addRecords(task.getAppId(), insertRecords);
+ }
+ if (updateRecords.size() > 0) {
+ client.record().updateRecords(task.getAppId(), updateRecords);
+ }
+ }
+ private List<Record> getExistingRecordsByUpdateKey(ArrayList<UpdateKey> updateKeys) {
+ String fieldCode = updateKeys.get(0).getField();
+ List<String> queryValues =
+ updateKeys.stream()
+ .filter(k -> k.getValue() != "")
+ .map(k -> "\"" + k.getValue().toString() + "\"")
+ .collect(Collectors.toList());
- private List<Record> getExistingRecordsByUpdateKey(ArrayList<UpdateKey> updateKeys)
- {
- String fieldCode = updateKeys.get(0).getField();
- List<String> queryValues = updateKeys
- .stream()
- .filter(k -> k.getValue() != "")
- .map(k -> "\"" + k.getValue().toString() + "\"")
- .collect(Collectors.toList());
-
- List<Record> allRecords = new ArrayList<>();
- if (queryValues.isEmpty()) {
- return allRecords;
- }
- String cursorId = client.record().createCursor(
- task.getAppId(),
- Collections.singletonList(fieldCode),
- fieldCode + " in (" + String.join(",", queryValues) + ")"
- );
- while (true) {
- GetRecordsByCursorResponseBody resp = client.record().getRecordsByCursor(cursorId);
- List<Record> records = resp.getRecords();
- allRecords.addAll(records);
-
- if (!resp.hasNext()) {
- break;
- }
- }
- return allRecords;
+ List<Record> allRecords = new ArrayList<>();
+ if (queryValues.isEmpty()) {
+ return allRecords;
}
+ String cursorId =
+ client
+ .record()
+ .createCursor(
+ task.getAppId(),
+ Collections.singletonList(fieldCode),
+ fieldCode + " in (" + String.join(",", queryValues) + ")");
+ while (true) {
+ GetRecordsByCursorResponseBody resp = client.record().getRecordsByCursor(cursorId);
+ List<Record> records = resp.getRecords();
+ allRecords.addAll(records);
- private boolean existsRecord(List<Record> distRecords, UpdateKey updateKey)
- {
- String fieldCode = updateKey.getField();
- FieldType type = client.app().getFormFields(task.getAppId()).get(fieldCode).getType();
- switch (type) {
- case SINGLE_LINE_TEXT:
- return distRecords
- .stream()
- .anyMatch(d -> d.getSingleLineTextFieldValue(fieldCode).equals(updateKey.getValue().toString()));
- case NUMBER:
- return distRecords
- .stream()
- .anyMatch(d -> d.getNumberFieldValue(fieldCode).toPlainString()
- .equals(updateKey.getValue().toString()));
- default:
- throw new RuntimeException("The update_key must be 'SINGLE_LINE_TEXT' or 'NUMBER'.");
- }
+ if (!resp.hasNext()) {
+ break;
+ }
}
+ return allRecords;
+ }
+
+ private boolean existsRecord(List<String> distValues, UpdateKey updateKey) {
+ return distValues.stream().anyMatch(v -> v.equals(updateKey.getValue().toString()));
+ }
}