src/main/java/org/embulk/output/redshift/RedshiftOutputConnection.java in embulk-output-redshift-0.6.0 vs src/main/java/org/embulk/output/redshift/RedshiftOutputConnection.java in embulk-output-redshift-0.6.1
- old
+ new
@@ -1,7 +1,8 @@
package org.embulk.output.redshift;
+import java.util.List;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import org.slf4j.Logger;
import org.embulk.spi.Exec;
@@ -117,6 +118,54 @@
stmt.executeUpdate(sql);
} finally {
stmt.close();
}
}
+
+ @Override
+ protected String buildCollectMergeSql(List<String> fromTables, JdbcSchema schema, String toTable, List<String> mergeKeys) throws SQLException
+ {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("BEGIN TRANSACTION;");
+
+ sb.append("DELETE FROM ");
+ quoteIdentifierString(sb, toTable);
+ sb.append(" USING (");
+ for(int i=0; i < fromTables.size(); i++) {
+ if(i != 0) { sb.append(" UNION ALL "); }
+ sb.append("SELECT * FROM ");
+ quoteIdentifierString(sb, fromTables.get(i));
+ }
+ sb.append(") S WHERE (");
+ for(int i=0; i < mergeKeys.size(); i++) {
+ if (i != 0) { sb.append(" AND "); }
+ sb.append("S.");
+ quoteIdentifierString(sb, mergeKeys.get(i));
+ sb.append(" = ");
+ quoteIdentifierString(sb, toTable);
+ sb.append(".");
+ quoteIdentifierString(sb, mergeKeys.get(i));
+ }
+ sb.append(");");
+
+ sb.append("INSERT INTO ");
+ quoteIdentifierString(sb, toTable);
+ sb.append(" (");
+ for(int i=0; i < fromTables.size(); i++) {
+ if(i != 0) { sb.append(" UNION ALL "); }
+ sb.append("SELECT ");
+ for(int j=0; j < schema.getCount(); j++) {
+ if (j != 0) { sb.append(", "); }
+ quoteIdentifierString(sb, schema.getColumnName(j));
+ }
+ sb.append(" FROM ");
+ quoteIdentifierString(sb, fromTables.get(i));
+ }
+ sb.append(");");
+
+ sb.append("END TRANSACTION;");
+
+ return sb.toString();
+ }
+
}