src/main/java/org/embulk/output/kintone/KintonePageOutput.java in embulk-output-kintone-0.1.1 vs src/main/java/org/embulk/output/kintone/KintonePageOutput.java in embulk-output-kintone-0.2.0
- old
+ new
@@ -1,49 +1,60 @@
package org.embulk.output.kintone;
-import com.cybozu.kintone.client.authentication.Auth;
-import com.cybozu.kintone.client.connection.Connection;
-import com.cybozu.kintone.client.model.record.field.FieldValue;
-import com.cybozu.kintone.client.module.record.Record;
+import com.kintone.client.KintoneClient;
+import com.kintone.client.KintoneClientBuilder;
+import com.kintone.client.api.record.GetRecordsByCursorResponseBody;
+import com.kintone.client.model.record.Record;
+import com.kintone.client.model.record.RecordForUpdate;
+import com.kintone.client.model.record.UpdateKey;
import org.embulk.config.TaskReport;
import org.embulk.spi.Column;
import org.embulk.spi.Exec;
import org.embulk.spi.Page;
import org.embulk.spi.PageReader;
import org.embulk.spi.Schema;
import org.embulk.spi.TransactionalPageOutput;
+import java.math.BigDecimal;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
public class KintonePageOutput
implements TransactionalPageOutput
{
private PageReader pageReader;
private PluginTask task;
- private Connection conn;
+ private KintoneClient client;
public KintonePageOutput(PluginTask task, Schema schema)
{
this.pageReader = new PageReader(schema);
this.task = task;
}
@Override
public void add(Page page)
{
- switch (task.getMode()) {
+ KintoneMode mode = KintoneMode.getKintoneModeByValue(task.getMode());
+ switch (mode) {
case INSERT:
insertPage(page);
break;
case UPDATE:
- // TODO updatePage
+ updatePage(page);
+ break;
case UPSERT:
- // TODO upsertPage
+ upsertPage(page);
+ break;
default:
- throw new UnsupportedOperationException(
- "kintone output plugin does not support update, upsert");
+ throw new UnsupportedOperationException(String.format(
+ "Unknown mode '%s'",
+ task.getMode()));
}
}
@Override
public void finish()
@@ -52,11 +63,19 @@
}
@Override
public void close()
{
- // noop
+ if (this.client == null) {
+ return;
+ }
+ try {
+ this.client.close();
+ }
+ catch (Exception e) {
+ throw new RuntimeException("kintone throw exception", e);
+ }
}
@Override
public void abort()
{
@@ -74,63 +93,244 @@
public void accept(T t);
}
public void connect(final PluginTask task)
{
- Auth kintoneAuth = new Auth();
- if (task.getUsername().isPresent() && task.getPassword().isPresent()) {
- kintoneAuth.setPasswordAuth(task.getUsername().get(), task.getPassword().get());
+ KintoneClientBuilder builder = KintoneClientBuilder.create("https://" + task.getDomain());
+ if (task.getGuestSpaceId().isPresent()) {
+ builder.setGuestSpaceId(task.getGuestSpaceId().or(-1));
}
- else if (task.getToken().isPresent()) {
- kintoneAuth.setApiToken(task.getToken().get());
- }
-
if (task.getBasicAuthUsername().isPresent() && task.getBasicAuthPassword().isPresent()) {
- kintoneAuth.setBasicAuth(task.getBasicAuthUsername().get(),
+ builder.withBasicAuth(task.getBasicAuthUsername().get(),
task.getBasicAuthPassword().get());
}
- if (task.getGuestSpaceId().isPresent()) {
- this.conn = new Connection(task.getDomain(), kintoneAuth, task.getGuestSpaceId().or(-1));
+ if (task.getUsername().isPresent() && task.getPassword().isPresent()) {
+ this.client = builder
+ .authByPassword(task.getUsername().get(), task.getPassword().get())
+ .build();
}
- else {
- this.conn = new Connection(task.getDomain(), kintoneAuth);
+ else if (task.getToken().isPresent()) {
+ this.client = builder
+ .authByApiToken(task.getToken().get())
+ .build();
}
}
- private void execute(Consumer<Connection> operation)
+ private void execute(Consumer<KintoneClient> operation)
{
connect(task);
- operation.accept(this.conn);
+ operation.accept(this.client);
}
private void insertPage(final Page page)
{
- execute(conn -> {
+ execute(client -> {
try {
- ArrayList<HashMap<String, FieldValue>> records = new ArrayList<>();
+ ArrayList<Record> records = new ArrayList<>();
pageReader.setPage(page);
KintoneColumnVisitor visitor = new KintoneColumnVisitor(pageReader,
task.getColumnOptions());
- Record kintoneRecordManager = new Record(conn);
while (pageReader.nextRecord()) {
- HashMap record = new HashMap();
+ Record record = new Record();
visitor.setRecord(record);
for (Column column : pageReader.getSchema().getColumns()) {
column.visit(visitor);
}
+
records.add(record);
if (records.size() == 100) {
- kintoneRecordManager.addRecords(task.getAppId(), records);
+ client.record().addRecords(task.getAppId(), records);
records.clear();
}
}
if (records.size() > 0) {
- kintoneRecordManager.addRecords(task.getAppId(), records);
+ client.record().addRecords(task.getAppId(), records);
}
}
catch (Exception e) {
throw new RuntimeException("kintone throw exception", e);
}
});
+ }
+
+ private void updatePage(final Page page)
+ {
+ execute(client -> {
+ try {
+ ArrayList<RecordForUpdate> updateRecords = new ArrayList<RecordForUpdate>();
+ pageReader.setPage(page);
+ KintoneColumnVisitor visitor = new KintoneColumnVisitor(pageReader,
+ task.getColumnOptions());
+ while (pageReader.nextRecord()) {
+ Record record = new Record();
+ UpdateKey updateKey = new UpdateKey();
+ visitor.setRecord(record);
+ visitor.setUpdateKey(updateKey);
+ for (Column column : pageReader.getSchema().getColumns()) {
+ column.visit(visitor);
+ }
+
+ record.removeField(updateKey.getField());
+ updateRecords.add(new RecordForUpdate(updateKey, record));
+ if (updateRecords.size() == 100) {
+ client.record().updateRecords(task.getAppId(), updateRecords);
+ updateRecords.clear();
+ }
+ }
+ if (updateRecords.size() > 0) {
+ client.record().updateRecords(task.getAppId(), updateRecords);
+ }
+ }
+ catch (Exception e) {
+ throw new RuntimeException("kintone throw exception", e);
+ }
+ });
+ }
+
+ private List<Record> getAllRecords(String fieldCode)
+ {
+ List<Record> allRecords = new ArrayList<Record>();
+ List<String> fields = Arrays.asList(fieldCode);
+ String cursorId = client.record().createCursor(task.getAppId(), fields, null);
+ while (true) {
+ GetRecordsByCursorResponseBody resp = client.record().getRecordsByCursor(cursorId);
+ List<Record> records = resp.getRecords();
+ allRecords.addAll(records);
+
+ if (!resp.hasNext()) {
+ break;
+ }
+ }
+ return allRecords;
+ }
+
+ abstract class UpsertPage<T>
+ {
+ public abstract List<T> getUpdateKeyValues();
+ public abstract boolean existsRecord(List<T> updateKeyValues, Record record);
+
+ public void run(final Page page)
+ {
+ execute(client -> {
+ try {
+ List<T> updateKeyValues = getUpdateKeyValues();
+
+ ArrayList<Record> insertRecords = new ArrayList<>();
+ ArrayList<RecordForUpdate> updateRecords = new ArrayList<RecordForUpdate>();
+ pageReader.setPage(page);
+ KintoneColumnVisitor visitor = new KintoneColumnVisitor(pageReader,
+ task.getColumnOptions());
+ while (pageReader.nextRecord()) {
+ Record record = new Record();
+ UpdateKey updateKey = new UpdateKey();
+ visitor.setRecord(record);
+ visitor.setUpdateKey(updateKey);
+ for (Column column : pageReader.getSchema().getColumns()) {
+ column.visit(visitor);
+ }
+
+ if (existsRecord(updateKeyValues, record)) {
+ record.removeField(updateKey.getField());
+ updateRecords.add(new RecordForUpdate(updateKey, record));
+ }
+ else {
+ insertRecords.add(record);
+ }
+
+ if (insertRecords.size() == 100) {
+ client.record().addRecords(task.getAppId(), insertRecords);
+ insertRecords.clear();
+ }
+ else if (updateRecords.size() == 100) {
+ client.record().updateRecords(task.getAppId(), updateRecords);
+ updateRecords.clear();
+ }
+ }
+
+ if (insertRecords.size() > 0) {
+ client.record().addRecords(task.getAppId(), insertRecords);
+ }
+ if (updateRecords.size() > 0) {
+ client.record().updateRecords(task.getAppId(), updateRecords);
+ }
+ }
+ catch (Exception e) {
+ throw new RuntimeException("kintone throw exception", e);
+ }
+ });
+ }
+ }
+
+ class UpsertPageByStringKey extends UpsertPage<String>
+ {
+ private String fieldCode;
+
+ public UpsertPageByStringKey(String fieldCode)
+ {
+ this.fieldCode = fieldCode;
+ }
+
+ public List<String> getUpdateKeyValues()
+ {
+ return getAllRecords(fieldCode)
+ .stream()
+ .map(r -> r.getSingleLineTextFieldValue(fieldCode))
+ .collect(Collectors.toList());
+ }
+
+ public boolean existsRecord(List<String> updateKeyValues, Record record)
+ {
+ return updateKeyValues.contains(record.getSingleLineTextFieldValue(fieldCode));
+ }
+ }
+
+ class UpsertPageByNumberKey extends UpsertPage<BigDecimal>
+ {
+ private String fieldCode;
+
+ public UpsertPageByNumberKey(String fieldCode)
+ {
+ this.fieldCode = fieldCode;
+ }
+
+ public List<BigDecimal> getUpdateKeyValues()
+ {
+ return getAllRecords(fieldCode)
+ .stream()
+ .map(r -> r.getNumberFieldValue(fieldCode))
+ .collect(Collectors.toList());
+ }
+
+ public boolean existsRecord(List<BigDecimal> updateKeyValues, Record record)
+ {
+ return updateKeyValues.contains(record.getNumberFieldValue(fieldCode));
+ }
+ }
+
+ private void upsertPage(final Page page)
+ {
+ KintoneColumnOption updateKeyColumn = null;
+ for (KintoneColumnOption v : task.getColumnOptions().values()) {
+ if (v.getUpdateKey()) {
+ updateKeyColumn = v;
+ break;
+ }
+ }
+ if (updateKeyColumn == null) {
+ throw new RuntimeException("when mode is upsert, require update_key");
+ }
+
+ UpsertPage runner;
+ switch(updateKeyColumn.getType()) {
+ case "SINGLE_LINE_TEXT":
+ runner = new UpsertPageByStringKey(updateKeyColumn.getFieldCode());
+ break;
+ case "NUMBER":
+ runner = new UpsertPageByNumberKey(updateKeyColumn.getFieldCode());
+ break;
+ default:
+ throw new RuntimeException("The update_key must be 'SINGLE_LINE_TEXT' or 'NUMBER'.");
+ }
+ runner.run(page);
}
}