src/main/java/org/embulk/output/postgresql/PostgreSQLOutputConnection.java in embulk-output-postgresql-0.6.1 vs src/main/java/org/embulk/output/postgresql/PostgreSQLOutputConnection.java in embulk-output-postgresql-0.6.2
- old
+ new
@@ -2,10 +2,12 @@
import java.util.List;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
+
+import org.embulk.output.jdbc.MergeConfig;
import org.postgresql.copy.CopyManager;
import org.postgresql.core.BaseConnection;
import org.embulk.output.jdbc.JdbcOutputConnection;
import org.embulk.output.jdbc.JdbcColumn;
import org.embulk.output.jdbc.JdbcSchema;
@@ -25,11 +27,11 @@
StringBuilder sb = new StringBuilder();
sb.append("COPY ");
quoteIdentifierString(sb, toTable);
sb.append(" (");
- for (int i=0; i < toTableSchema.getCount(); i++) {
+ for (int i = 0; i < toTableSchema.getCount(); i++) {
if (i != 0) { sb.append(", "); }
quoteIdentifierString(sb, toTableSchema.getColumnName(i));
}
sb.append(") ");
sb.append("FROM STDIN");
@@ -41,81 +43,94 @@
{
return new CopyManager((BaseConnection) connection);
}
@Override
- protected String buildCollectMergeSql(List<String> fromTables, JdbcSchema schema, String toTable, List<String> mergeKeys) throws SQLException
+ protected String buildCollectMergeSql(List<String> fromTables, JdbcSchema schema, String toTable, MergeConfig mergeConfig) throws SQLException
{
StringBuilder sb = new StringBuilder();
sb.append("WITH updated AS (");
sb.append("UPDATE ");
quoteIdentifierString(sb, toTable);
sb.append(" SET ");
- for (int i=0; i < schema.getCount(); i++) {
- if (i != 0) { sb.append(", "); }
- quoteIdentifierString(sb, schema.getColumnName(i));
- sb.append(" = S.");
- quoteIdentifierString(sb, schema.getColumnName(i));
+ if (mergeConfig.getMergeRule().isPresent()) {
+ List<String> rule = mergeConfig.getMergeRule().get();
+ for (int i = 0; i < rule.size(); i++) {
+ if (i != 0) {
+ sb.append(", ");
+ }
+ sb.append(rule.get(i));
+ }
+ } else {
+ for (int i = 0; i < schema.getCount(); i++) {
+ if (i != 0) {
+ sb.append(", ");
+ }
+ quoteIdentifierString(sb, schema.getColumnName(i));
+ sb.append(" = S.");
+ quoteIdentifierString(sb, schema.getColumnName(i));
+ }
}
sb.append(" FROM (");
- for (int i=0; i < fromTables.size(); i++) {
+ for (int i = 0; i < fromTables.size(); i++) {
if (i != 0) { sb.append(" UNION ALL "); }
sb.append("SELECT ");
- for(int j=0; j < schema.getCount(); j++) {
+ for(int j = 0; j < schema.getCount(); j++) {
if (j != 0) { sb.append(", "); }
quoteIdentifierString(sb, schema.getColumnName(j));
}
sb.append(" FROM ");
quoteIdentifierString(sb, fromTables.get(i));
}
sb.append(") S");
sb.append(" WHERE ");
- for (int i=0; i < mergeKeys.size(); i++) {
+ List<String> mergeKeys = mergeConfig.getMergeKeys();
+ for (int i = 0; i < mergeKeys.size(); i++) {
if (i != 0) { sb.append(" AND "); }
quoteIdentifierString(sb, toTable);
sb.append(".");
quoteIdentifierString(sb, mergeKeys.get(i));
sb.append(" = ");
sb.append("S.");
quoteIdentifierString(sb, mergeKeys.get(i));
}
sb.append(" RETURNING ");
- for (int i=0; i < mergeKeys.size(); i++) {
+ for (int i = 0; i < mergeKeys.size(); i++) {
if (i != 0) { sb.append(", "); }
sb.append("S.");
quoteIdentifierString(sb, mergeKeys.get(i));
}
sb.append(") ");
sb.append("INSERT INTO ");
quoteIdentifierString(sb, toTable);
sb.append(" (");
- for (int i=0; i < schema.getCount(); i++) {
+ for (int i = 0; i < schema.getCount(); i++) {
if (i != 0) { sb.append(", "); }
quoteIdentifierString(sb, schema.getColumnName(i));
}
sb.append(") ");
sb.append("SELECT DISTINCT ON (");
- for (int i=0; i < mergeKeys.size(); i++) {
+ for (int i = 0; i < mergeKeys.size(); i++) {
if (i != 0) { sb.append(", "); }
quoteIdentifierString(sb, mergeKeys.get(i));
}
sb.append(") * FROM (");
- for (int i=0; i < fromTables.size(); i++) {
+ for (int i = 0; i < fromTables.size(); i++) {
if (i != 0) { sb.append(" UNION ALL "); }
sb.append("SELECT ");
- for(int j=0; j < schema.getCount(); j++) {
+ for(int j = 0; j < schema.getCount(); j++) {
if (j != 0) { sb.append(", "); }
quoteIdentifierString(sb, schema.getColumnName(j));
}
sb.append(" FROM ");
quoteIdentifierString(sb, fromTables.get(i));
}
sb.append(") S ");
sb.append("WHERE NOT EXISTS (");
sb.append("SELECT 1 FROM updated WHERE ");
- for (int i=0; i < mergeKeys.size(); i++) {
+ for (int i = 0; i < mergeKeys.size(); i++) {
if (i != 0) { sb.append(" AND "); }
sb.append("S.");
quoteIdentifierString(sb, mergeKeys.get(i));
sb.append(" = ");
sb.append("updated.");