src/main/java/org/embulk/output/oracle/OracleOutputConnection.java in embulk-output-oracle-0.7.8 vs src/main/java/org/embulk/output/oracle/OracleOutputConnection.java in embulk-output-oracle-0.7.9

- old
+ new

@@ -9,14 +9,15 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.embulk.output.jdbc.JdbcOutputConnection; import org.embulk.output.jdbc.JdbcColumn; +import org.embulk.output.jdbc.JdbcOutputConnection; import org.embulk.output.jdbc.JdbcSchema; import org.embulk.output.jdbc.MergeConfig; +import org.embulk.output.jdbc.TableIdentifier; public class OracleOutputConnection extends JdbcOutputConnection { private static final Map<String, String> CHARSET_NAMES = new HashMap<String, String>(); @@ -61,56 +62,32 @@ connection.setSchema(schema); } } @Override - public void dropTableIfExists(String tableName) throws SQLException + public void dropTableIfExists(TableIdentifier table) throws SQLException { - if (tableExists(tableName)) { - dropTable(tableName); + if (tableExists(table)) { + dropTable(table); } } @Override - protected void dropTableIfExists(Statement stmt, String tableName) throws SQLException { - if (tableExists(tableName)) { - dropTable(stmt, tableName); + protected void dropTableIfExists(Statement stmt, TableIdentifier table) throws SQLException { + if (tableExists(table)) { + dropTable(stmt, table); } } @Override - public void createTableIfNotExists(String tableName, JdbcSchema schema) throws SQLException + public void createTableIfNotExists(TableIdentifier table, JdbcSchema schema) throws SQLException { - if (!tableExists(tableName)) { - createTable(tableName, schema); + if (!tableExists(table)) { + createTable(table, schema); } } - public void createTable(String tableName, JdbcSchema schema) throws SQLException - { - Statement stmt = connection.createStatement(); - try { - String sql = buildCreateTableSql(tableName, schema); - executeUpdate(stmt, sql); - commitIfNecessary(connection); - } catch (SQLException ex) { - throw safeRollback(connection, ex); - } finally { - stmt.close(); - } - } - - protected String buildCreateTableSql(String name, JdbcSchema schema) - { - StringBuilder sb = new StringBuilder(); - - sb.append("CREATE TABLE "); - quoteIdentifierString(sb, name); - sb.append(buildCreateTableSchemaSql(schema)); - return sb.toString(); - } - private static String getSchema(Connection connection) throws SQLException { // Because old Oracle JDBC drivers don't support Connection#getSchema method. String sql = "SELECT SYS_CONTEXT('USERENV', 'CURRENT_SCHEMA') FROM DUAL"; try (Statement statement = connection.createStatement()) { @@ -122,12 +99,24 @@ } } } @Override - protected String buildPreparedInsertSql(String toTable, JdbcSchema toTableSchema) throws SQLException + protected String buildRenameTableSql(TableIdentifier fromTable, TableIdentifier toTable) { + // ALTER TABLE doesn't support schema + StringBuilder sb = new StringBuilder(); + sb.append("ALTER TABLE "); + quoteIdentifierString(sb, fromTable.getTableName()); + sb.append(" RENAME TO "); + quoteIdentifierString(sb, toTable.getTableName()); + return sb.toString(); + } + + @Override + protected String buildPreparedInsertSql(TableIdentifier toTable, JdbcSchema toTableSchema) throws SQLException + { String sql = super.buildPreparedInsertSql(toTable, toTableSchema); if (direct) { sql = sql.replaceAll("^INSERT ", "INSERT /*+ APPEND_VALUES */ "); } return sql; @@ -197,23 +186,23 @@ } return super.getColumnDeclareType(convertedTypeName, col); } @Override - protected String buildCollectMergeSql(List<String> fromTables, JdbcSchema schema, String toTable, MergeConfig mergeConfig) throws SQLException + protected String buildCollectMergeSql(List<TableIdentifier> fromTables, JdbcSchema schema, TableIdentifier toTable, MergeConfig mergeConfig) throws SQLException { StringBuilder sb = new StringBuilder(); sb.append("MERGE INTO "); - sb.append(quoteIdentifierString(toTable)); + sb.append(quoteTableIdentifier(toTable)); sb.append(" T"); sb.append(" USING ("); for (int i = 0; i < fromTables.size(); i++) { if (i != 0) { sb.append(" UNION ALL "); } sb.append(" SELECT "); sb.append(buildColumns(schema, "")); sb.append(" FROM "); - sb.append(quoteIdentifierString(fromTables.get(i))); + sb.append(quoteTableIdentifier(fromTables.get(i))); } sb.append(") S"); sb.append(" ON ("); for (int i = 0; i < mergeConfig.getMergeKeys().size(); i++) { if (i != 0) { sb.append(" AND "); }