src/main/java/org/embulk/output/PostgreSQLOutputPlugin.java in embulk-output-postgresql-0.2.2 vs src/main/java/org/embulk/output/PostgreSQLOutputPlugin.java in embulk-output-postgresql-0.2.3
- old
+ new
@@ -1,17 +1,22 @@
package org.embulk.output;
+import java.util.List;
import java.util.Properties;
import java.io.IOException;
import java.sql.SQLException;
+
+import org.embulk.output.jdbc.setter.ColumnSetter;
+import org.embulk.output.postgresql.PostgresqlBatchUpsert;
import org.embulk.spi.Exec;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.output.jdbc.AbstractJdbcOutputPlugin;
import org.embulk.output.jdbc.BatchInsert;
import org.embulk.output.postgresql.PostgreSQLOutputConnector;
import org.embulk.output.postgresql.PostgreSQLCopyBatchInsert;
+import org.embulk.spi.PageReader;
public class PostgreSQLOutputPlugin
extends AbstractJdbcOutputPlugin
{
public interface PostgreSQLPluginTask
@@ -84,10 +89,50 @@
return new PostgreSQLOutputConnector(url, props, t.getSchema());
}
@Override
+ protected PluginPageOutput newPluginPageOutput(PageReader reader,
+ BatchInsert batch, List<ColumnSetter> columnSetters,
+ int batchSize)
+ {
+ return new PostgresPluginPageOutput(reader, batch, columnSetters, batchSize);
+ }
+
+ public static class PostgresPluginPageOutput extends PluginPageOutput
+ {
+
+ public PostgresPluginPageOutput(PageReader pageReader, BatchInsert batch, List<ColumnSetter> columnSetters, int batchSize) {
+ super(pageReader, batch, columnSetters, batchSize);
+ }
+
+ @Override
+ protected void handleColumnsSetters()
+ {
+ int size = columnSetters.size();
+ for (int i=0; i < size; i++) {
+ ColumnSetter columnSetter = columnSetters.get(i);
+ if (!columnSetter.getColumn().isPrimaryKey()) {
+ columns.get(i).visit(columnSetter);
+ }
+ }
+ for (int i=0; i < size; i++) {
+ ColumnSetter columnSetter = columnSetters.get(i);
+ if (columnSetter.getColumn().isPrimaryKey()) {
+ columns.get(i).visit(columnSetter);
+ }
+ }
+ for (int i=0; i < size; i++) {
+ columns.get(i).visit(columnSetters.get(i));
+ }
+ }
+
+ }
+
+ @Override
protected BatchInsert newBatchInsert(PluginTask task) throws IOException, SQLException
{
- return new PostgreSQLCopyBatchInsert(getConnector(task, true));
+ PostgreSQLOutputConnector connector = getConnector(task, true);
+ return task.getMode().isMerge() ? new PostgresqlBatchUpsert(connector) :
+ new PostgreSQLCopyBatchInsert(getConnector(task, true));
}
}