src/main/java/org/embulk/output/redshift/RedshiftOutputConnection.java in embulk-output-redshift-0.6.0 vs src/main/java/org/embulk/output/redshift/RedshiftOutputConnection.java in embulk-output-redshift-0.6.1

- old
+ new

@@ -1,7 +1,8 @@ package org.embulk.output.redshift; +import java.util.List; import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; import org.slf4j.Logger; import org.embulk.spi.Exec; @@ -117,6 +118,54 @@ stmt.executeUpdate(sql); } finally { stmt.close(); } } + + @Override + protected String buildCollectMergeSql(List<String> fromTables, JdbcSchema schema, String toTable, List<String> mergeKeys) throws SQLException + { + StringBuilder sb = new StringBuilder(); + + sb.append("BEGIN TRANSACTION;"); + + sb.append("DELETE FROM "); + quoteIdentifierString(sb, toTable); + sb.append(" USING ("); + for(int i=0; i < fromTables.size(); i++) { + if(i != 0) { sb.append(" UNION ALL "); } + sb.append("SELECT * FROM "); + quoteIdentifierString(sb, fromTables.get(i)); + } + sb.append(") S WHERE ("); + for(int i=0; i < mergeKeys.size(); i++) { + if (i != 0) { sb.append(" AND "); } + sb.append("S."); + quoteIdentifierString(sb, mergeKeys.get(i)); + sb.append(" = "); + quoteIdentifierString(sb, toTable); + sb.append("."); + quoteIdentifierString(sb, mergeKeys.get(i)); + } + sb.append(");"); + + sb.append("INSERT INTO "); + quoteIdentifierString(sb, toTable); + sb.append(" ("); + for(int i=0; i < fromTables.size(); i++) { + if(i != 0) { sb.append(" UNION ALL "); } + sb.append("SELECT "); + for(int j=0; j < schema.getCount(); j++) { + if (j != 0) { sb.append(", "); } + quoteIdentifierString(sb, schema.getColumnName(j)); + } + sb.append(" FROM "); + quoteIdentifierString(sb, fromTables.get(i)); + } + sb.append(");"); + + sb.append("END TRANSACTION;"); + + return sb.toString(); + } + }