src/main/java/org/embulk/output/kintone/KintonePageOutput.java in embulk-output-kintone-1.0.0 vs src/main/java/org/embulk/output/kintone/KintonePageOutput.java in embulk-output-kintone-1.1.0
- old
+ new
@@ -48,11 +48,10 @@
"GAIA_TM12", // 作成できるカーソルの上限に達しているため、カーソルを作成できません。不要なカーソルを削除するか、しばらく経ってから再実行してください。
"GAIA_RE18", // データベースのロックに失敗したため、変更を保存できませんでした。時間をおいて再度お試しください。
"GAIA_DA02" // データベースのロックに失敗したため、変更を保存できませんでした。時間をおいて再度お試しください。
);
private static final int UPSERT_BATCH_SIZE = 10000;
- private static final int CHUNK_SIZE = 100;
private final Map<String, Pair<FieldType, FieldType>> wrongTypeFields = new TreeMap<>();
private final PluginTask task;
private final PageReader reader;
private KintoneClient client;
private Map<String, FieldProperty> formFields;
@@ -111,11 +110,11 @@
"Field type of %s is expected %s but actual %s",
key, value.getLeft(), value.getRight())));
return Exec.newTaskReport();
}
- public void connectIfNeeded() {
+ private void connectIfNeeded() {
if (client != null) {
return; // Already connected
}
KintoneClientBuilder builder = KintoneClientBuilder.create("https://" + task.getDomain());
if (task.getGuestSpaceId().isPresent()) {
@@ -201,18 +200,23 @@
private void insertPage(Page page) {
List<Record> records = new ArrayList<>();
reader.setPage(page);
KintoneColumnVisitor visitor =
new KintoneColumnVisitor(
- reader, task.getColumnOptions(), task.getPreferNulls(), task.getIgnoreNulls());
+ reader,
+ task.getDerivedColumns(),
+ task.getColumnOptions(),
+ task.getPreferNulls(),
+ task.getIgnoreNulls(),
+ task.getReduceKeyName().orElse(null));
while (reader.nextRecord()) {
Record record = new Record();
visitor.setRecord(record);
reader.getSchema().visitColumns(visitor);
putWrongTypeFields(record);
records.add(record);
- if (records.size() == CHUNK_SIZE) {
+ if (records.size() == task.getChunkSize()) {
insert(records);
records.clear();
}
}
if (!records.isEmpty()) {
@@ -224,13 +228,15 @@
List<RecordForUpdate> records = new ArrayList<>();
reader.setPage(page);
KintoneColumnVisitor visitor =
new KintoneColumnVisitor(
reader,
+ task.getDerivedColumns(),
task.getColumnOptions(),
task.getPreferNulls(),
task.getIgnoreNulls(),
+ task.getReduceKeyName().orElse(null),
task.getUpdateKeyName()
.orElseThrow(() -> new RuntimeException("unreachable"))); // Already validated
while (reader.nextRecord()) {
Record record = new Record();
UpdateKey updateKey = new UpdateKey();
@@ -241,11 +247,11 @@
if (updateKey.getValue() == null || updateKey.getValue().toString().isEmpty()) {
LOGGER.warn("Record skipped because no update key value was specified");
continue;
}
records.add(new RecordForUpdate(updateKey, record.removeField(updateKey.getField())));
- if (records.size() == CHUNK_SIZE) {
+ if (records.size() == task.getChunkSize()) {
update(records);
records.clear();
}
}
if (!records.isEmpty()) {
@@ -258,13 +264,15 @@
List<UpdateKey> updateKeys = new ArrayList<>();
reader.setPage(page);
KintoneColumnVisitor visitor =
new KintoneColumnVisitor(
reader,
+ task.getDerivedColumns(),
task.getColumnOptions(),
task.getPreferNulls(),
task.getIgnoreNulls(),
+ task.getReduceKeyName().orElse(null),
task.getUpdateKeyName()
.orElseThrow(() -> new RuntimeException("unreachable"))); // Already validated
while (reader.nextRecord()) {
Record record = new Record();
UpdateKey updateKey = new UpdateKey();
@@ -298,14 +306,14 @@
if (existsRecord(existingValues, updateKey)) {
updateRecords.add(new RecordForUpdate(updateKey, record.removeField(updateKey.getField())));
} else {
insertRecords.add(record);
}
- if (insertRecords.size() == CHUNK_SIZE) {
+ if (insertRecords.size() == task.getChunkSize()) {
insert(insertRecords);
insertRecords.clear();
- } else if (updateRecords.size() == CHUNK_SIZE) {
+ } else if (updateRecords.size() == task.getChunkSize()) {
update(updateRecords);
updateRecords.clear();
}
}
if (!insertRecords.isEmpty()) {
@@ -363,15 +371,10 @@
.map(fieldValueAsString)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
- private boolean existsRecord(List<String> existingValues, UpdateKey updateKey) {
- String value = toString(updateKey.getValue());
- return value != null && existingValues.stream().anyMatch(v -> v.equals(value));
- }
-
private void putWrongTypeFields(Record record) {
record.getFieldCodes(true).stream()
.map(
fieldCode ->
Maps.immutableEntry(
@@ -382,9 +385,14 @@
private FieldType getFieldType(String fieldCode) {
connectIfNeeded();
FieldProperty field = formFields.get(fieldCode);
return field == null ? null : field.getType();
+ }
+
+ private static boolean existsRecord(List<String> existingValues, UpdateKey updateKey) {
+ String value = toString(updateKey.getValue());
+ return value != null && existingValues.stream().anyMatch(v -> v.equals(value));
}
private static String toString(Object value) {
return value == null
? null