src/main/java/org/embulk/output/postgresql/PostgreSQLOutputConnection.java in embulk-output-postgresql-0.2.4 vs src/main/java/org/embulk/output/postgresql/PostgreSQLOutputConnection.java in embulk-output-postgresql-0.3.0

- old
+ new

@@ -1,7 +1,8 @@ package org.embulk.output.postgresql; +import java.util.List; import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; import org.postgresql.copy.CopyManager; import org.postgresql.core.BaseConnection; @@ -26,11 +27,11 @@ sb.append("COPY "); quoteIdentifierString(sb, toTable); sb.append(" ("); for (int i=0; i < toTableSchema.getCount(); i++) { - if(i != 0) { sb.append(", "); } + if (i != 0) { sb.append(", "); } quoteIdentifierString(sb, toTableSchema.getColumnName(i)); } sb.append(") "); sb.append("FROM STDIN"); @@ -41,68 +42,116 @@ { return new CopyManager((BaseConnection) connection); } @Override - protected String convertTypeName(String typeName) + protected String buildCollectMergeSql(List<String> fromTables, JdbcSchema schema, String toTable, List<String> mergeKeys) throws SQLException { - switch(typeName) { - case "CLOB": - return "TEXT"; - case "BLOB": - return "BYTEA"; - default: - return typeName; - } - } - - @Override - protected String buildPrepareUpsertSql(String toTable, JdbcSchema toTableSchema) throws SQLException - { StringBuilder sb = new StringBuilder(); - int size = toTableSchema.getCount(); - String table = quoteIdentifierString(toTable); - int idx = 0; - sb.append("WITH upsert AS (UPDATE ").append(table).append(" SET "); - - for (int i=0; i < size; i++) { - JdbcColumn c = toTableSchema.getColumn(i); - if (!c.isPrimaryKey()) { - if(idx != 0) { sb.append(", "); } - idx++; - quoteIdentifierString(sb, toTableSchema.getColumnName(i)); - sb.append("=?"); + sb.append("WITH updated AS ("); + sb.append("UPDATE "); + quoteIdentifierString(sb, toTable); + sb.append(" SET "); + for (int i=0; i < schema.getCount(); i++) { + if (i != 0) { sb.append(", "); } + quoteIdentifierString(sb, schema.getColumnName(i)); + sb.append(" = S."); + quoteIdentifierString(sb, schema.getColumnName(i)); + } + sb.append(" FROM ("); + for (int i=0; i < fromTables.size(); i++) { + if (i != 0) { sb.append(" UNION ALL "); } + sb.append("SELECT "); + for(int j=0; j < schema.getCount(); j++) { + if (j != 0) { sb.append(", "); } + quoteIdentifierString(sb, schema.getColumnName(j)); } + sb.append(" FROM "); + quoteIdentifierString(sb, fromTables.get(i)); } - + sb.append(") S"); sb.append(" WHERE "); - idx = 0; - for(int i=0; i < size; i++) { - JdbcColumn c = toTableSchema.getColumn(i); - if (c.isPrimaryKey()) { - if(idx != 0) { sb.append(" AND "); } - idx++; - quoteIdentifierString(sb, toTableSchema.getColumnName(i)); - sb.append("=?"); - } + for (int i=0; i < mergeKeys.size(); i++) { + if (i != 0) { sb.append(" AND "); } + quoteIdentifierString(sb, toTable); + sb.append("."); + quoteIdentifierString(sb, mergeKeys.get(i)); + sb.append(" = "); + sb.append("S."); + quoteIdentifierString(sb, mergeKeys.get(i)); } - sb.append(" RETURNING true as result)"); - - sb.append(" INSERT INTO ").append(table).append(" ("); - for (int i=0; i < size; i++) { - if(i != 0) { sb.append(", "); } - quoteIdentifierString(sb, toTableSchema.getColumnName(i)); + sb.append(" RETURNING "); + for (int i=0; i < mergeKeys.size(); i++) { + if (i != 0) { sb.append(", "); } + sb.append("S."); + quoteIdentifierString(sb, mergeKeys.get(i)); } - sb.append(")"); + sb.append(") "); - sb.append(" SELECT "); - for (int i=0; i < size; i++) { - if(i != 0) { sb.append(", "); } - sb.append("?"); + sb.append("INSERT INTO "); + quoteIdentifierString(sb, toTable); + sb.append(" ("); + for (int i=0; i < schema.getCount(); i++) { + if (i != 0) { sb.append(", "); } + quoteIdentifierString(sb, schema.getColumnName(i)); } - sb.append(" WHERE (SELECT result FROM upsert) is null"); + sb.append(") "); + sb.append("SELECT DISTINCT ON ("); + for (int i=0; i < mergeKeys.size(); i++) { + if (i != 0) { sb.append(", "); } + quoteIdentifierString(sb, mergeKeys.get(i)); + } + sb.append(") * FROM ("); + for (int i=0; i < fromTables.size(); i++) { + if (i != 0) { sb.append(" UNION ALL "); } + sb.append("SELECT "); + for(int j=0; j < schema.getCount(); j++) { + if (j != 0) { sb.append(", "); } + quoteIdentifierString(sb, schema.getColumnName(j)); + } + sb.append(" FROM "); + quoteIdentifierString(sb, fromTables.get(i)); + } + sb.append(") S "); + sb.append("WHERE NOT EXISTS ("); + sb.append("SELECT 1 FROM updated WHERE "); + for (int i=0; i < mergeKeys.size(); i++) { + if (i != 0) { sb.append(" AND "); } + sb.append("S."); + quoteIdentifierString(sb, mergeKeys.get(i)); + sb.append(" = "); + sb.append("updated."); + quoteIdentifierString(sb, mergeKeys.get(i)); + } + sb.append(") "); return sb.toString(); } + protected void collectReplaceView(List<String> fromTables, JdbcSchema schema, String toTable) throws SQLException + { + Statement stmt = connection.createStatement(); + try { + String sql = buildCollectInsertSql(fromTables, schema, toTable); + executeUpdate(stmt, sql); + commitIfNecessary(connection); + } catch (SQLException ex) { + throw safeRollback(connection, ex); + } finally { + stmt.close(); + } + } + + @Override + protected String buildColumnTypeName(JdbcColumn c) + { + switch(c.getSimpleTypeName()) { + case "CLOB": + return "TEXT"; + case "BLOB": + return "BYTEA"; + default: + return super.buildColumnTypeName(c); + } + } }