src/main/java/org/embulk/output/postgresql/PostgreSQLOutputConnection.java in embulk-output-postgresql-0.2.2 vs src/main/java/org/embulk/output/postgresql/PostgreSQLOutputConnection.java in embulk-output-postgresql-0.2.3
- old
+ new
@@ -52,6 +52,57 @@
return "BYTEA";
default:
return typeName;
}
}
+
+ @Override
+ protected String buildPrepareUpsertSql(String toTable, JdbcSchema toTableSchema) throws SQLException
+ {
+ StringBuilder sb = new StringBuilder();
+ int size = toTableSchema.getCount();
+ String table = quoteIdentifierString(toTable);
+ int idx = 0;
+
+ sb.append("WITH upsert AS (UPDATE ").append(table).append(" SET ");
+
+ for (int i=0; i < size; i++) {
+ JdbcColumn c = toTableSchema.getColumn(i);
+ if (!c.isPrimaryKey()) {
+ if(idx != 0) { sb.append(", "); }
+ idx++;
+ quoteIdentifierString(sb, toTableSchema.getColumnName(i));
+ sb.append("=?");
+ }
+ }
+
+ sb.append(" WHERE ");
+ idx = 0;
+ for(int i=0; i < size; i++) {
+ JdbcColumn c = toTableSchema.getColumn(i);
+ if (c.isPrimaryKey()) {
+ if(idx != 0) { sb.append(" AND "); }
+ idx++;
+ quoteIdentifierString(sb, toTableSchema.getColumnName(i));
+ sb.append("=?");
+ }
+ }
+ sb.append(" RETURNING true as result)");
+
+ sb.append(" INSERT INTO ").append(table).append(" (");
+ for (int i=0; i < size; i++) {
+ if(i != 0) { sb.append(", "); }
+ quoteIdentifierString(sb, toTableSchema.getColumnName(i));
+ }
+ sb.append(")");
+
+ sb.append(" SELECT ");
+ for (int i=0; i < size; i++) {
+ if(i != 0) { sb.append(", "); }
+ sb.append("?");
+ }
+ sb.append(" WHERE (SELECT result FROM upsert) is null");
+
+ return sb.toString();
+ }
+
}