src/main/java/org/embulk/output/postgresql/PostgreSQLOutputConnection.java in embulk-output-postgresql-0.7.2 vs src/main/java/org/embulk/output/postgresql/PostgreSQLOutputConnection.java in embulk-output-postgresql-0.7.3

- old
+ new

@@ -43,9 +43,89 @@ { return new CopyManager((BaseConnection) connection); } @Override + protected String buildPreparedMergeSql(String toTable, JdbcSchema schema, MergeConfig mergeConfig) throws SQLException + { + StringBuilder sb = new StringBuilder(); + + sb.append("WITH S AS ("); + sb.append("SELECT "); + for (int i = 0; i < schema.getCount(); i++) { + if (i != 0) { sb.append(", "); } + sb.append("? AS "); + quoteIdentifierString(sb, schema.getColumnName(i)); + } + sb.append("),"); + sb.append("updated AS ("); + sb.append("UPDATE "); + quoteIdentifierString(sb, toTable); + sb.append(" SET "); + if (mergeConfig.getMergeRule().isPresent()) { + List<String> rule = mergeConfig.getMergeRule().get(); + for (int i = 0; i < rule.size(); i++) { + if (i != 0) { sb.append(", "); } + sb.append(rule.get(i)); + } + } else { + for (int i = 0; i < schema.getCount(); i++) { + if (i != 0) { sb.append(", "); } + quoteIdentifierString(sb, schema.getColumnName(i)); + sb.append(" = S."); + quoteIdentifierString(sb, schema.getColumnName(i)); + } + } + sb.append(" FROM S"); + sb.append(" WHERE "); + List<String> mergeKeys = mergeConfig.getMergeKeys(); + for (int i = 0; i < mergeKeys.size(); i++) { + if (i != 0) { sb.append(" AND "); } + quoteIdentifierString(sb, toTable); + sb.append("."); + quoteIdentifierString(sb, mergeKeys.get(i)); + sb.append(" = "); + sb.append("S."); + quoteIdentifierString(sb, mergeKeys.get(i)); + } + sb.append(" RETURNING "); + for (int i = 0; i < mergeKeys.size(); i++) { + if (i != 0) { sb.append(", "); } + sb.append("S."); + quoteIdentifierString(sb, mergeKeys.get(i)); + } + sb.append(") "); + + sb.append("INSERT INTO "); + quoteIdentifierString(sb, toTable); + sb.append(" ("); + for (int i = 0; i < schema.getCount(); i++) { + if (i != 0) { sb.append(", "); } + quoteIdentifierString(sb, schema.getColumnName(i)); + } + sb.append(") "); + sb.append("SELECT "); + for(int i = 0; i < schema.getCount(); i++) { + if (i != 0) { sb.append(", "); } + quoteIdentifierString(sb, schema.getColumnName(i)); + } + sb.append(" FROM S "); + sb.append("WHERE NOT EXISTS ("); + sb.append("SELECT 1 FROM updated WHERE "); + for (int i = 0; i < mergeKeys.size(); i++) { + if (i != 0) { sb.append(" AND "); } + sb.append("S."); + quoteIdentifierString(sb, mergeKeys.get(i)); + sb.append(" = "); + sb.append("updated."); + quoteIdentifierString(sb, mergeKeys.get(i)); + } + sb.append(") "); + + return sb.toString(); + } + + @Override protected String buildCollectMergeSql(List<String> fromTables, JdbcSchema schema, String toTable, MergeConfig mergeConfig) throws SQLException { StringBuilder sb = new StringBuilder(); sb.append("WITH updated AS (");