src/main/java/org/embulk/output/redshift/RedshiftOutputConnection.java in embulk-output-redshift-0.7.0 vs src/main/java/org/embulk/output/redshift/RedshiftOutputConnection.java in embulk-output-redshift-0.7.1
- old
+ new
@@ -1,19 +1,22 @@
package org.embulk.output.redshift;
-import java.util.List;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
-import org.embulk.output.jdbc.MergeConfig;
-import org.slf4j.Logger;
-import org.embulk.spi.Exec;
-import org.embulk.output.jdbc.JdbcOutputConnection;
import org.embulk.output.jdbc.JdbcColumn;
+import org.embulk.output.jdbc.JdbcOutputConnection;
import org.embulk.output.jdbc.JdbcSchema;
+import org.embulk.output.jdbc.MergeConfig;
+import org.embulk.spi.Exec;
+import org.slf4j.Logger;
+import com.google.common.base.Optional;
+
public class RedshiftOutputConnection
extends JdbcOutputConnection
{
private final Logger logger = Exec.getLogger(RedshiftOutputConnection.class);
@@ -44,11 +47,11 @@
}
// Redshift does not support DROP TABLE IF EXISTS.
// Dropping part runs DROP TABLE and ignores errors.
@Override
- public void replaceTable(String fromTable, JdbcSchema schema, String toTable) throws SQLException
+ public void replaceTable(String fromTable, JdbcSchema schema, String toTable, Optional<String> additionalSql) throws SQLException
{
Statement stmt = connection.createStatement();
try {
try {
StringBuilder sb = new StringBuilder();
@@ -71,10 +74,14 @@
quoteIdentifierString(sb, toTable);
String sql = sb.toString();
executeUpdate(stmt, sql);
}
+ if (additionalSql.isPresent()) {
+ executeUpdate(stmt, additionalSql.get());
+ }
+
commitIfNecessary(connection);
} catch (SQLException ex) {
throw safeRollback(connection, ex);
} finally {
stmt.close();
@@ -126,54 +133,93 @@
@Override
protected String buildCollectMergeSql(List<String> fromTables, JdbcSchema schema, String toTable, MergeConfig mergeConfig) throws SQLException
{
StringBuilder sb = new StringBuilder();
+ List<String> mergeKeys = mergeConfig.getMergeKeys();
+
+ List<String> updateKeys = new ArrayList<String>();
+ for (int i = 0; i < schema.getCount(); i++) {
+ String updateKey = schema.getColumnName(i);
+ if (!mergeKeys.contains(updateKey)) {
+ updateKeys.add(updateKey);
+ }
+ }
+
sb.append("BEGIN TRANSACTION;");
- sb.append("DELETE FROM ");
+ sb.append("UPDATE ");
quoteIdentifierString(sb, toTable);
- sb.append(" USING (");
+ sb.append(" SET ");
+ for (int i = 0; i < updateKeys.size(); i++) {
+ if (i != 0) { sb.append(", "); }
+ quoteIdentifierString(sb, updateKeys.get(i).toString());
+ sb.append(" = ");
+ sb.append("S.");
+ quoteIdentifierString(sb, updateKeys.get(i).toString());
+ }
+ sb.append(" FROM ( ");
for (int i = 0; i < fromTables.size(); i++) {
if (i != 0) { sb.append(" UNION ALL "); }
- sb.append("SELECT * FROM ");
+ sb.append(" SELECT ");
+ for (int j = 0; j < schema.getCount(); j++) {
+ if (j != 0) { sb.append(", "); }
+ quoteIdentifierString(sb, schema.getColumnName(j));
+ }
+ sb.append(" FROM ");
quoteIdentifierString(sb, fromTables.get(i));
}
- sb.append(") S WHERE (");
- List<String> mergeKeys = mergeConfig.getMergeKeys();
+ sb.append(" ) S WHERE ");
+
for (int i = 0; i < mergeKeys.size(); i++) {
if (i != 0) { sb.append(" AND "); }
sb.append("S.");
quoteIdentifierString(sb, mergeKeys.get(i));
sb.append(" = ");
quoteIdentifierString(sb, toTable);
sb.append(".");
quoteIdentifierString(sb, mergeKeys.get(i));
}
- sb.append(");");
+ sb.append(";");
sb.append("INSERT INTO ");
quoteIdentifierString(sb, toTable);
sb.append(" (");
for (int i = 0; i < schema.getCount(); i++) {
if (i != 0) { sb.append(", "); }
quoteIdentifierString(sb, schema.getColumnName(i));
}
sb.append(") (");
for (int i = 0; i < fromTables.size(); i++) {
- if (i != 0) { sb.append(" UNION ALL "); }
+ if (i != 0) { sb.append(" UNION ALL ("); }
sb.append("SELECT ");
for (int j = 0; j < schema.getCount(); j++) {
if (j != 0) { sb.append(", "); }
quoteIdentifierString(sb, schema.getColumnName(j));
}
sb.append(" FROM ");
quoteIdentifierString(sb, fromTables.get(i));
+ sb.append(" WHERE NOT EXISTS (SELECT 1 FROM ");
+ quoteIdentifierString(sb, toTable);
+ sb.append(" WHERE ");
+
+ for (int k = 0; k < mergeKeys.size(); k++) {
+ if (k != 0) { sb.append(" AND "); }
+ quoteIdentifierString(sb, fromTables.get(i));
+ sb.append(".");
+ quoteIdentifierString(sb, mergeKeys.get(k));
+ sb.append(" = ");
+ quoteIdentifierString(sb, toTable);
+ sb.append(".");
+ quoteIdentifierString(sb, mergeKeys.get(k));
+ }
+ sb.append(")) ");
}
- sb.append(");");
+ sb.append(";");
sb.append("END TRANSACTION;");
return sb.toString();
+
}
-}
+}
\ No newline at end of file