src/main/java/org/embulk/output/postgresql/PostgreSQLOutputConnection.java in embulk-output-postgresql-0.2.4 vs src/main/java/org/embulk/output/postgresql/PostgreSQLOutputConnection.java in embulk-output-postgresql-0.3.0
- old
+ new
@@ -1,7 +1,8 @@
package org.embulk.output.postgresql;
+import java.util.List;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import org.postgresql.copy.CopyManager;
import org.postgresql.core.BaseConnection;
@@ -26,11 +27,11 @@
sb.append("COPY ");
quoteIdentifierString(sb, toTable);
sb.append(" (");
for (int i=0; i < toTableSchema.getCount(); i++) {
- if(i != 0) { sb.append(", "); }
+ if (i != 0) { sb.append(", "); }
quoteIdentifierString(sb, toTableSchema.getColumnName(i));
}
sb.append(") ");
sb.append("FROM STDIN");
@@ -41,68 +42,116 @@
{
return new CopyManager((BaseConnection) connection);
}
@Override
- protected String convertTypeName(String typeName)
+ protected String buildCollectMergeSql(List<String> fromTables, JdbcSchema schema, String toTable, List<String> mergeKeys) throws SQLException
{
- switch(typeName) {
- case "CLOB":
- return "TEXT";
- case "BLOB":
- return "BYTEA";
- default:
- return typeName;
- }
- }
-
- @Override
- protected String buildPrepareUpsertSql(String toTable, JdbcSchema toTableSchema) throws SQLException
- {
StringBuilder sb = new StringBuilder();
- int size = toTableSchema.getCount();
- String table = quoteIdentifierString(toTable);
- int idx = 0;
- sb.append("WITH upsert AS (UPDATE ").append(table).append(" SET ");
-
- for (int i=0; i < size; i++) {
- JdbcColumn c = toTableSchema.getColumn(i);
- if (!c.isPrimaryKey()) {
- if(idx != 0) { sb.append(", "); }
- idx++;
- quoteIdentifierString(sb, toTableSchema.getColumnName(i));
- sb.append("=?");
+ sb.append("WITH updated AS (");
+ sb.append("UPDATE ");
+ quoteIdentifierString(sb, toTable);
+ sb.append(" SET ");
+ for (int i=0; i < schema.getCount(); i++) {
+ if (i != 0) { sb.append(", "); }
+ quoteIdentifierString(sb, schema.getColumnName(i));
+ sb.append(" = S.");
+ quoteIdentifierString(sb, schema.getColumnName(i));
+ }
+ sb.append(" FROM (");
+ for (int i=0; i < fromTables.size(); i++) {
+ if (i != 0) { sb.append(" UNION ALL "); }
+ sb.append("SELECT ");
+ for(int j=0; j < schema.getCount(); j++) {
+ if (j != 0) { sb.append(", "); }
+ quoteIdentifierString(sb, schema.getColumnName(j));
}
+ sb.append(" FROM ");
+ quoteIdentifierString(sb, fromTables.get(i));
}
-
+ sb.append(") S");
sb.append(" WHERE ");
- idx = 0;
- for(int i=0; i < size; i++) {
- JdbcColumn c = toTableSchema.getColumn(i);
- if (c.isPrimaryKey()) {
- if(idx != 0) { sb.append(" AND "); }
- idx++;
- quoteIdentifierString(sb, toTableSchema.getColumnName(i));
- sb.append("=?");
- }
+ for (int i=0; i < mergeKeys.size(); i++) {
+ if (i != 0) { sb.append(" AND "); }
+ quoteIdentifierString(sb, toTable);
+ sb.append(".");
+ quoteIdentifierString(sb, mergeKeys.get(i));
+ sb.append(" = ");
+ sb.append("S.");
+ quoteIdentifierString(sb, mergeKeys.get(i));
}
- sb.append(" RETURNING true as result)");
-
- sb.append(" INSERT INTO ").append(table).append(" (");
- for (int i=0; i < size; i++) {
- if(i != 0) { sb.append(", "); }
- quoteIdentifierString(sb, toTableSchema.getColumnName(i));
+ sb.append(" RETURNING ");
+ for (int i=0; i < mergeKeys.size(); i++) {
+ if (i != 0) { sb.append(", "); }
+ sb.append("S.");
+ quoteIdentifierString(sb, mergeKeys.get(i));
}
- sb.append(")");
+ sb.append(") ");
- sb.append(" SELECT ");
- for (int i=0; i < size; i++) {
- if(i != 0) { sb.append(", "); }
- sb.append("?");
+ sb.append("INSERT INTO ");
+ quoteIdentifierString(sb, toTable);
+ sb.append(" (");
+ for (int i=0; i < schema.getCount(); i++) {
+ if (i != 0) { sb.append(", "); }
+ quoteIdentifierString(sb, schema.getColumnName(i));
}
- sb.append(" WHERE (SELECT result FROM upsert) is null");
+ sb.append(") ");
+ sb.append("SELECT DISTINCT ON (");
+ for (int i=0; i < mergeKeys.size(); i++) {
+ if (i != 0) { sb.append(", "); }
+ quoteIdentifierString(sb, mergeKeys.get(i));
+ }
+ sb.append(") * FROM (");
+ for (int i=0; i < fromTables.size(); i++) {
+ if (i != 0) { sb.append(" UNION ALL "); }
+ sb.append("SELECT ");
+ for(int j=0; j < schema.getCount(); j++) {
+ if (j != 0) { sb.append(", "); }
+ quoteIdentifierString(sb, schema.getColumnName(j));
+ }
+ sb.append(" FROM ");
+ quoteIdentifierString(sb, fromTables.get(i));
+ }
+ sb.append(") S ");
+ sb.append("WHERE NOT EXISTS (");
+ sb.append("SELECT 1 FROM updated WHERE ");
+ for (int i=0; i < mergeKeys.size(); i++) {
+ if (i != 0) { sb.append(" AND "); }
+ sb.append("S.");
+ quoteIdentifierString(sb, mergeKeys.get(i));
+ sb.append(" = ");
+ sb.append("updated.");
+ quoteIdentifierString(sb, mergeKeys.get(i));
+ }
+ sb.append(") ");
return sb.toString();
}
+ protected void collectReplaceView(List<String> fromTables, JdbcSchema schema, String toTable) throws SQLException
+ {
+ Statement stmt = connection.createStatement();
+ try {
+ String sql = buildCollectInsertSql(fromTables, schema, toTable);
+ executeUpdate(stmt, sql);
+ commitIfNecessary(connection);
+ } catch (SQLException ex) {
+ throw safeRollback(connection, ex);
+ } finally {
+ stmt.close();
+ }
+ }
+
+ @Override
+ protected String buildColumnTypeName(JdbcColumn c)
+ {
+ switch(c.getSimpleTypeName()) {
+ case "CLOB":
+ return "TEXT";
+ case "BLOB":
+ return "BYTEA";
+ default:
+ return super.buildColumnTypeName(c);
+ }
+ }
}