src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java in embulk-output-jdbc-0.7.6 vs src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java in embulk-output-jdbc-0.7.7

- old
+ new

@@ -292,11 +292,11 @@ stmt.close(); } } protected void collectInsert(List<String> fromTables, JdbcSchema schema, String toTable, - boolean truncateDestinationFirst, Optional<String> additionalSql) throws SQLException + boolean truncateDestinationFirst, Optional<String> preSql, Optional<String> postSql) throws SQLException { if (fromTables.isEmpty()) { return; } @@ -304,15 +304,22 @@ try { if (truncateDestinationFirst) { String sql = buildTruncateSql(toTable); executeUpdate(stmt, sql); } + + if (preSql.isPresent()) { + executeUpdate(stmt, preSql.get()); + } + String sql = buildCollectInsertSql(fromTables, schema, toTable); executeUpdate(stmt, sql); - if (additionalSql.isPresent()) { - executeUpdate(stmt, additionalSql.get()); + + if (postSql.isPresent()) { + executeUpdate(stmt, postSql.get()); } + commitIfNecessary(connection); } catch (SQLException ex) { throw safeRollback(connection, ex); } finally { stmt.close(); @@ -354,23 +361,29 @@ return sb.toString(); } protected void collectMerge(List<String> fromTables, JdbcSchema schema, String toTable, MergeConfig mergeConfig, - Optional<String> additionalSql) throws SQLException + Optional<String> preSql, Optional<String> postSql) throws SQLException { if (fromTables.isEmpty()) { return; } Statement stmt = connection.createStatement(); try { + if (preSql.isPresent()) { + executeUpdate(stmt, preSql.get()); + } + String sql = buildCollectMergeSql(fromTables, schema, toTable, mergeConfig); executeUpdate(stmt, sql); - if (additionalSql.isPresent()) { - executeUpdate(stmt, additionalSql.get()); + + if (postSql.isPresent()) { + executeUpdate(stmt, postSql.get()); } + commitIfNecessary(connection); } catch (SQLException ex) { throw safeRollback(connection, ex); } finally { stmt.close(); @@ -380,18 +393,21 @@ protected String buildCollectMergeSql(List<String> fromTables, JdbcSchema schema, String toTable, MergeConfig mergeConfig) throws SQLException { throw new UnsupportedOperationException("not implemented"); } - public void replaceTable(String fromTable, JdbcSchema schema, String toTable, Optional<String> additionalSql) throws SQLException + public void replaceTable(String fromTable, JdbcSchema schema, String toTable, Optional<String> postSql) throws SQLException { Statement stmt = connection.createStatement(); try { dropTableIfExists(stmt, toTable); + executeUpdate(stmt, buildRenameTableSql(fromTable, toTable)); - if (additionalSql.isPresent()) { - executeUpdate(stmt, additionalSql.get()); + + if (postSql.isPresent()) { + executeUpdate(stmt, postSql.get()); } + commitIfNecessary(connection); } catch (SQLException ex) { throw safeRollback(connection, ex); } finally { stmt.close();