src/main/java/org/embulk/output/PostgreSQLOutputPlugin.java in embulk-output-postgresql-0.2.2 vs src/main/java/org/embulk/output/PostgreSQLOutputPlugin.java in embulk-output-postgresql-0.2.3

- old
+ new

@@ -1,17 +1,22 @@ package org.embulk.output; +import java.util.List; import java.util.Properties; import java.io.IOException; import java.sql.SQLException; + +import org.embulk.output.jdbc.setter.ColumnSetter; +import org.embulk.output.postgresql.PostgresqlBatchUpsert; import org.embulk.spi.Exec; import org.embulk.config.Config; import org.embulk.config.ConfigDefault; import org.embulk.output.jdbc.AbstractJdbcOutputPlugin; import org.embulk.output.jdbc.BatchInsert; import org.embulk.output.postgresql.PostgreSQLOutputConnector; import org.embulk.output.postgresql.PostgreSQLCopyBatchInsert; +import org.embulk.spi.PageReader; public class PostgreSQLOutputPlugin extends AbstractJdbcOutputPlugin { public interface PostgreSQLPluginTask @@ -84,10 +89,50 @@ return new PostgreSQLOutputConnector(url, props, t.getSchema()); } @Override + protected PluginPageOutput newPluginPageOutput(PageReader reader, + BatchInsert batch, List<ColumnSetter> columnSetters, + int batchSize) + { + return new PostgresPluginPageOutput(reader, batch, columnSetters, batchSize); + } + + public static class PostgresPluginPageOutput extends PluginPageOutput + { + + public PostgresPluginPageOutput(PageReader pageReader, BatchInsert batch, List<ColumnSetter> columnSetters, int batchSize) { + super(pageReader, batch, columnSetters, batchSize); + } + + @Override + protected void handleColumnsSetters() + { + int size = columnSetters.size(); + for (int i=0; i < size; i++) { + ColumnSetter columnSetter = columnSetters.get(i); + if (!columnSetter.getColumn().isPrimaryKey()) { + columns.get(i).visit(columnSetter); + } + } + for (int i=0; i < size; i++) { + ColumnSetter columnSetter = columnSetters.get(i); + if (columnSetter.getColumn().isPrimaryKey()) { + columns.get(i).visit(columnSetter); + } + } + for (int i=0; i < size; i++) { + columns.get(i).visit(columnSetters.get(i)); + } + } + + } + + @Override protected BatchInsert newBatchInsert(PluginTask task) throws IOException, SQLException { - return new PostgreSQLCopyBatchInsert(getConnector(task, true)); + PostgreSQLOutputConnector connector = getConnector(task, true); + return task.getMode().isMerge() ? new PostgresqlBatchUpsert(connector) : + new PostgreSQLCopyBatchInsert(getConnector(task, true)); } }