src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java in embulk-output-jdbc-0.7.0 vs src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java in embulk-output-jdbc-0.7.1

- old
+ new

@@ -278,21 +278,41 @@ protected String buildPreparedMergeSql(String toTable, JdbcSchema toTableSchema, MergeConfig mergeConfig) throws SQLException { throw new UnsupportedOperationException("not implemented"); } + protected void executeSql(String sql) throws SQLException + { + Statement stmt = connection.createStatement(); + try { + executeUpdate(stmt, sql); + commitIfNecessary(connection); + } catch (SQLException ex) { + throw safeRollback(connection, ex); + } finally { + stmt.close(); + } + } + protected void collectInsert(List<String> fromTables, JdbcSchema schema, String toTable, - boolean truncateDestinationFirst) throws SQLException + boolean truncateDestinationFirst, Optional<String> additionalSql) throws SQLException { + if (fromTables.isEmpty()) { + return; + } + Statement stmt = connection.createStatement(); try { if (truncateDestinationFirst) { String sql = buildTruncateSql(toTable); executeUpdate(stmt, sql); } String sql = buildCollectInsertSql(fromTables, schema, toTable); executeUpdate(stmt, sql); + if (additionalSql.isPresent()) { + executeUpdate(stmt, additionalSql.get()); + } commitIfNecessary(connection); } catch (SQLException ex) { throw safeRollback(connection, ex); } finally { stmt.close(); @@ -333,16 +353,24 @@ } return sb.toString(); } - protected void collectMerge(List<String> fromTables, JdbcSchema schema, String toTable, MergeConfig mergeConfig) throws SQLException + protected void collectMerge(List<String> fromTables, JdbcSchema schema, String toTable, MergeConfig mergeConfig, + Optional<String> additionalSql) throws SQLException { + if (fromTables.isEmpty()) { + return; + } + Statement stmt = connection.createStatement(); try { String sql = buildCollectMergeSql(fromTables, schema, toTable, mergeConfig); executeUpdate(stmt, sql); + if (additionalSql.isPresent()) { + executeUpdate(stmt, additionalSql.get()); + } commitIfNecessary(connection); } catch (SQLException ex) { throw safeRollback(connection, ex); } finally { stmt.close(); @@ -352,17 +380,18 @@ protected String buildCollectMergeSql(List<String> fromTables, JdbcSchema schema, String toTable, MergeConfig mergeConfig) throws SQLException { throw new UnsupportedOperationException("not implemented"); } - public void replaceTable(String fromTable, JdbcSchema schema, String toTable) throws SQLException + public void replaceTable(String fromTable, JdbcSchema schema, String toTable, Optional<String> additionalSql) throws SQLException { Statement stmt = connection.createStatement(); try { dropTableIfExists(stmt, toTable); - executeUpdate(stmt, buildRenameTableSql(fromTable, toTable)); - + if (additionalSql.isPresent()) { + executeUpdate(stmt, additionalSql.get()); + } commitIfNecessary(connection); } catch (SQLException ex) { throw safeRollback(connection, ex); } finally { stmt.close();