src/main/java/org/embulk/output/postgresql/PostgreSQLOutputConnection.java in embulk-output-postgresql-0.2.2 vs src/main/java/org/embulk/output/postgresql/PostgreSQLOutputConnection.java in embulk-output-postgresql-0.2.3

- old
+ new

@@ -52,6 +52,57 @@ return "BYTEA"; default: return typeName; } } + + @Override + protected String buildPrepareUpsertSql(String toTable, JdbcSchema toTableSchema) throws SQLException + { + StringBuilder sb = new StringBuilder(); + int size = toTableSchema.getCount(); + String table = quoteIdentifierString(toTable); + int idx = 0; + + sb.append("WITH upsert AS (UPDATE ").append(table).append(" SET "); + + for (int i=0; i < size; i++) { + JdbcColumn c = toTableSchema.getColumn(i); + if (!c.isPrimaryKey()) { + if(idx != 0) { sb.append(", "); } + idx++; + quoteIdentifierString(sb, toTableSchema.getColumnName(i)); + sb.append("=?"); + } + } + + sb.append(" WHERE "); + idx = 0; + for(int i=0; i < size; i++) { + JdbcColumn c = toTableSchema.getColumn(i); + if (c.isPrimaryKey()) { + if(idx != 0) { sb.append(" AND "); } + idx++; + quoteIdentifierString(sb, toTableSchema.getColumnName(i)); + sb.append("=?"); + } + } + sb.append(" RETURNING true as result)"); + + sb.append(" INSERT INTO ").append(table).append(" ("); + for (int i=0; i < size; i++) { + if(i != 0) { sb.append(", "); } + quoteIdentifierString(sb, toTableSchema.getColumnName(i)); + } + sb.append(")"); + + sb.append(" SELECT "); + for (int i=0; i < size; i++) { + if(i != 0) { sb.append(", "); } + sb.append("?"); + } + sb.append(" WHERE (SELECT result FROM upsert) is null"); + + return sb.toString(); + } + }