src/main/java/org/embulk/output/oracle/OracleOutputConnection.java in embulk-output-oracle-0.7.5 vs src/main/java/org/embulk/output/oracle/OracleOutputConnection.java in embulk-output-oracle-0.7.6

- old
+ new

@@ -6,15 +6,17 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.embulk.output.jdbc.JdbcOutputConnection; import org.embulk.output.jdbc.JdbcColumn; import org.embulk.output.jdbc.JdbcSchema; +import org.embulk.output.jdbc.MergeConfig; public class OracleOutputConnection extends JdbcOutputConnection { private static final Map<String, String> CHARSET_NAMES = new HashMap<String, String>(); @@ -192,7 +194,76 @@ { if (Arrays.asList(SIZE_TYPE_NAMES).contains(convertedTypeName)) { return ColumnDeclareType.SIZE; } return super.getColumnDeclareType(convertedTypeName, col); + } + + @Override + protected String buildCollectMergeSql(List<String> fromTables, JdbcSchema schema, String toTable, MergeConfig mergeConfig) throws SQLException + { + StringBuilder sb = new StringBuilder(); + + sb.append("MERGE INTO "); + sb.append(quoteIdentifierString(toTable)); + sb.append(" T"); + sb.append(" USING ("); + for (int i = 0; i < fromTables.size(); i++) { + if (i != 0) { sb.append(" UNION ALL "); } + sb.append(" SELECT "); + sb.append(buildColumns(schema, "")); + sb.append(" FROM "); + sb.append(quoteIdentifierString(fromTables.get(i))); + } + sb.append(") S"); + sb.append(" ON ("); + for (int i = 0; i < mergeConfig.getMergeKeys().size(); i++) { + if (i != 0) { sb.append(" AND "); } + String mergeKey = quoteIdentifierString(mergeConfig.getMergeKeys().get(i)); + sb.append("T."); + sb.append(mergeKey); + sb.append(" = S."); + sb.append(mergeKey); + } + sb.append(")"); + sb.append(" WHEN MATCHED THEN"); + sb.append(" UPDATE SET "); + if (mergeConfig.getMergeRule().isPresent()) { + for (int i = 0; i < mergeConfig.getMergeRule().get().size(); i++) { + if (i != 0) { sb.append(", "); } + sb.append(mergeConfig.getMergeRule().get().get(i)); + } + } else { + int index = 0; + for (int i = 0; i < schema.getCount(); i++) { + String rawColumn = schema.getColumnName(i); + if (mergeConfig.getMergeKeys().contains(rawColumn)) { + continue; + } + if (index++ != 0) { sb.append(", "); } + String column = quoteIdentifierString(rawColumn); + sb.append(column); + sb.append(" = S."); + sb.append(column); + } + } + sb.append(" WHEN NOT MATCHED THEN"); + sb.append(" INSERT ("); + sb.append(buildColumns(schema, "")); + sb.append(") VALUES ("); + sb.append(buildColumns(schema, "S.")); + sb.append(")"); + + return sb.toString(); + } + + private String buildColumns(JdbcSchema schema, String prefix) + { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < schema.getCount(); i++) { + if (i != 0) { sb.append(", "); } + sb.append(prefix); + sb.append(quoteIdentifierString(schema.getColumnName(i))); + } + return sb.toString(); } }