src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java in embulk-output-jdbc-0.7.6 vs src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java in embulk-output-jdbc-0.7.7
- old
+ new
@@ -292,11 +292,11 @@
stmt.close();
}
}
protected void collectInsert(List<String> fromTables, JdbcSchema schema, String toTable,
- boolean truncateDestinationFirst, Optional<String> additionalSql) throws SQLException
+ boolean truncateDestinationFirst, Optional<String> preSql, Optional<String> postSql) throws SQLException
{
if (fromTables.isEmpty()) {
return;
}
@@ -304,15 +304,22 @@
try {
if (truncateDestinationFirst) {
String sql = buildTruncateSql(toTable);
executeUpdate(stmt, sql);
}
+
+ if (preSql.isPresent()) {
+ executeUpdate(stmt, preSql.get());
+ }
+
String sql = buildCollectInsertSql(fromTables, schema, toTable);
executeUpdate(stmt, sql);
- if (additionalSql.isPresent()) {
- executeUpdate(stmt, additionalSql.get());
+
+ if (postSql.isPresent()) {
+ executeUpdate(stmt, postSql.get());
}
+
commitIfNecessary(connection);
} catch (SQLException ex) {
throw safeRollback(connection, ex);
} finally {
stmt.close();
@@ -354,23 +361,29 @@
return sb.toString();
}
protected void collectMerge(List<String> fromTables, JdbcSchema schema, String toTable, MergeConfig mergeConfig,
- Optional<String> additionalSql) throws SQLException
+ Optional<String> preSql, Optional<String> postSql) throws SQLException
{
if (fromTables.isEmpty()) {
return;
}
Statement stmt = connection.createStatement();
try {
+ if (preSql.isPresent()) {
+ executeUpdate(stmt, preSql.get());
+ }
+
String sql = buildCollectMergeSql(fromTables, schema, toTable, mergeConfig);
executeUpdate(stmt, sql);
- if (additionalSql.isPresent()) {
- executeUpdate(stmt, additionalSql.get());
+
+ if (postSql.isPresent()) {
+ executeUpdate(stmt, postSql.get());
}
+
commitIfNecessary(connection);
} catch (SQLException ex) {
throw safeRollback(connection, ex);
} finally {
stmt.close();
@@ -380,18 +393,21 @@
protected String buildCollectMergeSql(List<String> fromTables, JdbcSchema schema, String toTable, MergeConfig mergeConfig) throws SQLException
{
throw new UnsupportedOperationException("not implemented");
}
- public void replaceTable(String fromTable, JdbcSchema schema, String toTable, Optional<String> additionalSql) throws SQLException
+ public void replaceTable(String fromTable, JdbcSchema schema, String toTable, Optional<String> postSql) throws SQLException
{
Statement stmt = connection.createStatement();
try {
dropTableIfExists(stmt, toTable);
+
executeUpdate(stmt, buildRenameTableSql(fromTable, toTable));
- if (additionalSql.isPresent()) {
- executeUpdate(stmt, additionalSql.get());
+
+ if (postSql.isPresent()) {
+ executeUpdate(stmt, postSql.get());
}
+
commitIfNecessary(connection);
} catch (SQLException ex) {
throw safeRollback(connection, ex);
} finally {
stmt.close();