src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java in embulk-output-jdbc-0.7.0 vs src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java in embulk-output-jdbc-0.7.1
- old
+ new
@@ -278,21 +278,41 @@
protected String buildPreparedMergeSql(String toTable, JdbcSchema toTableSchema, MergeConfig mergeConfig) throws SQLException
{
throw new UnsupportedOperationException("not implemented");
}
+ protected void executeSql(String sql) throws SQLException
+ {
+ Statement stmt = connection.createStatement();
+ try {
+ executeUpdate(stmt, sql);
+ commitIfNecessary(connection);
+ } catch (SQLException ex) {
+ throw safeRollback(connection, ex);
+ } finally {
+ stmt.close();
+ }
+ }
+
protected void collectInsert(List<String> fromTables, JdbcSchema schema, String toTable,
- boolean truncateDestinationFirst) throws SQLException
+ boolean truncateDestinationFirst, Optional<String> additionalSql) throws SQLException
{
+ if (fromTables.isEmpty()) {
+ return;
+ }
+
Statement stmt = connection.createStatement();
try {
if (truncateDestinationFirst) {
String sql = buildTruncateSql(toTable);
executeUpdate(stmt, sql);
}
String sql = buildCollectInsertSql(fromTables, schema, toTable);
executeUpdate(stmt, sql);
+ if (additionalSql.isPresent()) {
+ executeUpdate(stmt, additionalSql.get());
+ }
commitIfNecessary(connection);
} catch (SQLException ex) {
throw safeRollback(connection, ex);
} finally {
stmt.close();
@@ -333,16 +353,24 @@
}
return sb.toString();
}
- protected void collectMerge(List<String> fromTables, JdbcSchema schema, String toTable, MergeConfig mergeConfig) throws SQLException
+ protected void collectMerge(List<String> fromTables, JdbcSchema schema, String toTable, MergeConfig mergeConfig,
+ Optional<String> additionalSql) throws SQLException
{
+ if (fromTables.isEmpty()) {
+ return;
+ }
+
Statement stmt = connection.createStatement();
try {
String sql = buildCollectMergeSql(fromTables, schema, toTable, mergeConfig);
executeUpdate(stmt, sql);
+ if (additionalSql.isPresent()) {
+ executeUpdate(stmt, additionalSql.get());
+ }
commitIfNecessary(connection);
} catch (SQLException ex) {
throw safeRollback(connection, ex);
} finally {
stmt.close();
@@ -352,17 +380,18 @@
protected String buildCollectMergeSql(List<String> fromTables, JdbcSchema schema, String toTable, MergeConfig mergeConfig) throws SQLException
{
throw new UnsupportedOperationException("not implemented");
}
- public void replaceTable(String fromTable, JdbcSchema schema, String toTable) throws SQLException
+ public void replaceTable(String fromTable, JdbcSchema schema, String toTable, Optional<String> additionalSql) throws SQLException
{
Statement stmt = connection.createStatement();
try {
dropTableIfExists(stmt, toTable);
-
executeUpdate(stmt, buildRenameTableSql(fromTable, toTable));
-
+ if (additionalSql.isPresent()) {
+ executeUpdate(stmt, additionalSql.get());
+ }
commitIfNecessary(connection);
} catch (SQLException ex) {
throw safeRollback(connection, ex);
} finally {
stmt.close();