src/main/java/org/embulk/output/postgresql/PostgreSQLOutputConnection.java in embulk-output-postgresql-0.7.2 vs src/main/java/org/embulk/output/postgresql/PostgreSQLOutputConnection.java in embulk-output-postgresql-0.7.3
- old
+ new
@@ -43,9 +43,89 @@
{
return new CopyManager((BaseConnection) connection);
}
@Override
+ protected String buildPreparedMergeSql(String toTable, JdbcSchema schema, MergeConfig mergeConfig) throws SQLException
+ {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("WITH S AS (");
+ sb.append("SELECT ");
+ for (int i = 0; i < schema.getCount(); i++) {
+ if (i != 0) { sb.append(", "); }
+ sb.append("? AS ");
+ quoteIdentifierString(sb, schema.getColumnName(i));
+ }
+ sb.append("),");
+ sb.append("updated AS (");
+ sb.append("UPDATE ");
+ quoteIdentifierString(sb, toTable);
+ sb.append(" SET ");
+ if (mergeConfig.getMergeRule().isPresent()) {
+ List<String> rule = mergeConfig.getMergeRule().get();
+ for (int i = 0; i < rule.size(); i++) {
+ if (i != 0) { sb.append(", "); }
+ sb.append(rule.get(i));
+ }
+ } else {
+ for (int i = 0; i < schema.getCount(); i++) {
+ if (i != 0) { sb.append(", "); }
+ quoteIdentifierString(sb, schema.getColumnName(i));
+ sb.append(" = S.");
+ quoteIdentifierString(sb, schema.getColumnName(i));
+ }
+ }
+ sb.append(" FROM S");
+ sb.append(" WHERE ");
+ List<String> mergeKeys = mergeConfig.getMergeKeys();
+ for (int i = 0; i < mergeKeys.size(); i++) {
+ if (i != 0) { sb.append(" AND "); }
+ quoteIdentifierString(sb, toTable);
+ sb.append(".");
+ quoteIdentifierString(sb, mergeKeys.get(i));
+ sb.append(" = ");
+ sb.append("S.");
+ quoteIdentifierString(sb, mergeKeys.get(i));
+ }
+ sb.append(" RETURNING ");
+ for (int i = 0; i < mergeKeys.size(); i++) {
+ if (i != 0) { sb.append(", "); }
+ sb.append("S.");
+ quoteIdentifierString(sb, mergeKeys.get(i));
+ }
+ sb.append(") ");
+
+ sb.append("INSERT INTO ");
+ quoteIdentifierString(sb, toTable);
+ sb.append(" (");
+ for (int i = 0; i < schema.getCount(); i++) {
+ if (i != 0) { sb.append(", "); }
+ quoteIdentifierString(sb, schema.getColumnName(i));
+ }
+ sb.append(") ");
+ sb.append("SELECT ");
+ for(int i = 0; i < schema.getCount(); i++) {
+ if (i != 0) { sb.append(", "); }
+ quoteIdentifierString(sb, schema.getColumnName(i));
+ }
+ sb.append(" FROM S ");
+ sb.append("WHERE NOT EXISTS (");
+ sb.append("SELECT 1 FROM updated WHERE ");
+ for (int i = 0; i < mergeKeys.size(); i++) {
+ if (i != 0) { sb.append(" AND "); }
+ sb.append("S.");
+ quoteIdentifierString(sb, mergeKeys.get(i));
+ sb.append(" = ");
+ sb.append("updated.");
+ quoteIdentifierString(sb, mergeKeys.get(i));
+ }
+ sb.append(") ");
+
+ return sb.toString();
+ }
+
+ @Override
protected String buildCollectMergeSql(List<String> fromTables, JdbcSchema schema, String toTable, MergeConfig mergeConfig) throws SQLException
{
StringBuilder sb = new StringBuilder();
sb.append("WITH updated AS (");