src/main/java/org/embulk/output/oracle/OracleOutputConnection.java in embulk-output-oracle-0.7.5 vs src/main/java/org/embulk/output/oracle/OracleOutputConnection.java in embulk-output-oracle-0.7.6
- old
+ new
@@ -6,15 +6,17 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.embulk.output.jdbc.JdbcOutputConnection;
import org.embulk.output.jdbc.JdbcColumn;
import org.embulk.output.jdbc.JdbcSchema;
+import org.embulk.output.jdbc.MergeConfig;
public class OracleOutputConnection
extends JdbcOutputConnection
{
private static final Map<String, String> CHARSET_NAMES = new HashMap<String, String>();
@@ -192,7 +194,76 @@
{
if (Arrays.asList(SIZE_TYPE_NAMES).contains(convertedTypeName)) {
return ColumnDeclareType.SIZE;
}
return super.getColumnDeclareType(convertedTypeName, col);
+ }
+
+ @Override
+ protected String buildCollectMergeSql(List<String> fromTables, JdbcSchema schema, String toTable, MergeConfig mergeConfig) throws SQLException
+ {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("MERGE INTO ");
+ sb.append(quoteIdentifierString(toTable));
+ sb.append(" T");
+ sb.append(" USING (");
+ for (int i = 0; i < fromTables.size(); i++) {
+ if (i != 0) { sb.append(" UNION ALL "); }
+ sb.append(" SELECT ");
+ sb.append(buildColumns(schema, ""));
+ sb.append(" FROM ");
+ sb.append(quoteIdentifierString(fromTables.get(i)));
+ }
+ sb.append(") S");
+ sb.append(" ON (");
+ for (int i = 0; i < mergeConfig.getMergeKeys().size(); i++) {
+ if (i != 0) { sb.append(" AND "); }
+ String mergeKey = quoteIdentifierString(mergeConfig.getMergeKeys().get(i));
+ sb.append("T.");
+ sb.append(mergeKey);
+ sb.append(" = S.");
+ sb.append(mergeKey);
+ }
+ sb.append(")");
+ sb.append(" WHEN MATCHED THEN");
+ sb.append(" UPDATE SET ");
+ if (mergeConfig.getMergeRule().isPresent()) {
+ for (int i = 0; i < mergeConfig.getMergeRule().get().size(); i++) {
+ if (i != 0) { sb.append(", "); }
+ sb.append(mergeConfig.getMergeRule().get().get(i));
+ }
+ } else {
+ int index = 0;
+ for (int i = 0; i < schema.getCount(); i++) {
+ String rawColumn = schema.getColumnName(i);
+ if (mergeConfig.getMergeKeys().contains(rawColumn)) {
+ continue;
+ }
+ if (index++ != 0) { sb.append(", "); }
+ String column = quoteIdentifierString(rawColumn);
+ sb.append(column);
+ sb.append(" = S.");
+ sb.append(column);
+ }
+ }
+ sb.append(" WHEN NOT MATCHED THEN");
+ sb.append(" INSERT (");
+ sb.append(buildColumns(schema, ""));
+ sb.append(") VALUES (");
+ sb.append(buildColumns(schema, "S."));
+ sb.append(")");
+
+ return sb.toString();
+ }
+
+ private String buildColumns(JdbcSchema schema, String prefix)
+ {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < schema.getCount(); i++) {
+ if (i != 0) { sb.append(", "); }
+ sb.append(prefix);
+ sb.append(quoteIdentifierString(schema.getColumnName(i)));
+ }
+ return sb.toString();
}
}