src/main/java/org/embulk/output/postgresql/PostgreSQLOutputConnection.java in embulk-output-postgresql-0.6.1 vs src/main/java/org/embulk/output/postgresql/PostgreSQLOutputConnection.java in embulk-output-postgresql-0.6.2

- old
+ new

@@ -2,10 +2,12 @@ import java.util.List; import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; + +import org.embulk.output.jdbc.MergeConfig; import org.postgresql.copy.CopyManager; import org.postgresql.core.BaseConnection; import org.embulk.output.jdbc.JdbcOutputConnection; import org.embulk.output.jdbc.JdbcColumn; import org.embulk.output.jdbc.JdbcSchema; @@ -25,11 +27,11 @@ StringBuilder sb = new StringBuilder(); sb.append("COPY "); quoteIdentifierString(sb, toTable); sb.append(" ("); - for (int i=0; i < toTableSchema.getCount(); i++) { + for (int i = 0; i < toTableSchema.getCount(); i++) { if (i != 0) { sb.append(", "); } quoteIdentifierString(sb, toTableSchema.getColumnName(i)); } sb.append(") "); sb.append("FROM STDIN"); @@ -41,81 +43,94 @@ { return new CopyManager((BaseConnection) connection); } @Override - protected String buildCollectMergeSql(List<String> fromTables, JdbcSchema schema, String toTable, List<String> mergeKeys) throws SQLException + protected String buildCollectMergeSql(List<String> fromTables, JdbcSchema schema, String toTable, MergeConfig mergeConfig) throws SQLException { StringBuilder sb = new StringBuilder(); sb.append("WITH updated AS ("); sb.append("UPDATE "); quoteIdentifierString(sb, toTable); sb.append(" SET "); - for (int i=0; i < schema.getCount(); i++) { - if (i != 0) { sb.append(", "); } - quoteIdentifierString(sb, schema.getColumnName(i)); - sb.append(" = S."); - quoteIdentifierString(sb, schema.getColumnName(i)); + if (mergeConfig.getMergeRule().isPresent()) { + List<String> rule = mergeConfig.getMergeRule().get(); + for (int i = 0; i < rule.size(); i++) { + if (i != 0) { + sb.append(", "); + } + sb.append(rule.get(i)); + } + } else { + for (int i = 0; i < schema.getCount(); i++) { + if (i != 0) { + sb.append(", "); + } + quoteIdentifierString(sb, schema.getColumnName(i)); + sb.append(" = S."); + quoteIdentifierString(sb, schema.getColumnName(i)); + } } sb.append(" FROM ("); - for (int i=0; i < fromTables.size(); i++) { + for (int i = 0; i < fromTables.size(); i++) { if (i != 0) { sb.append(" UNION ALL "); } sb.append("SELECT "); - for(int j=0; j < schema.getCount(); j++) { + for(int j = 0; j < schema.getCount(); j++) { if (j != 0) { sb.append(", "); } quoteIdentifierString(sb, schema.getColumnName(j)); } sb.append(" FROM "); quoteIdentifierString(sb, fromTables.get(i)); } sb.append(") S"); sb.append(" WHERE "); - for (int i=0; i < mergeKeys.size(); i++) { + List<String> mergeKeys = mergeConfig.getMergeKeys(); + for (int i = 0; i < mergeKeys.size(); i++) { if (i != 0) { sb.append(" AND "); } quoteIdentifierString(sb, toTable); sb.append("."); quoteIdentifierString(sb, mergeKeys.get(i)); sb.append(" = "); sb.append("S."); quoteIdentifierString(sb, mergeKeys.get(i)); } sb.append(" RETURNING "); - for (int i=0; i < mergeKeys.size(); i++) { + for (int i = 0; i < mergeKeys.size(); i++) { if (i != 0) { sb.append(", "); } sb.append("S."); quoteIdentifierString(sb, mergeKeys.get(i)); } sb.append(") "); sb.append("INSERT INTO "); quoteIdentifierString(sb, toTable); sb.append(" ("); - for (int i=0; i < schema.getCount(); i++) { + for (int i = 0; i < schema.getCount(); i++) { if (i != 0) { sb.append(", "); } quoteIdentifierString(sb, schema.getColumnName(i)); } sb.append(") "); sb.append("SELECT DISTINCT ON ("); - for (int i=0; i < mergeKeys.size(); i++) { + for (int i = 0; i < mergeKeys.size(); i++) { if (i != 0) { sb.append(", "); } quoteIdentifierString(sb, mergeKeys.get(i)); } sb.append(") * FROM ("); - for (int i=0; i < fromTables.size(); i++) { + for (int i = 0; i < fromTables.size(); i++) { if (i != 0) { sb.append(" UNION ALL "); } sb.append("SELECT "); - for(int j=0; j < schema.getCount(); j++) { + for(int j = 0; j < schema.getCount(); j++) { if (j != 0) { sb.append(", "); } quoteIdentifierString(sb, schema.getColumnName(j)); } sb.append(" FROM "); quoteIdentifierString(sb, fromTables.get(i)); } sb.append(") S "); sb.append("WHERE NOT EXISTS ("); sb.append("SELECT 1 FROM updated WHERE "); - for (int i=0; i < mergeKeys.size(); i++) { + for (int i = 0; i < mergeKeys.size(); i++) { if (i != 0) { sb.append(" AND "); } sb.append("S."); quoteIdentifierString(sb, mergeKeys.get(i)); sb.append(" = "); sb.append("updated.");