src/main/java/org/embulk/output/oracle/OracleOutputConnection.java in embulk-output-oracle-0.7.8 vs src/main/java/org/embulk/output/oracle/OracleOutputConnection.java in embulk-output-oracle-0.7.9
- old
+ new
@@ -9,14 +9,15 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.embulk.output.jdbc.JdbcOutputConnection;
import org.embulk.output.jdbc.JdbcColumn;
+import org.embulk.output.jdbc.JdbcOutputConnection;
import org.embulk.output.jdbc.JdbcSchema;
import org.embulk.output.jdbc.MergeConfig;
+import org.embulk.output.jdbc.TableIdentifier;
public class OracleOutputConnection
extends JdbcOutputConnection
{
private static final Map<String, String> CHARSET_NAMES = new HashMap<String, String>();
@@ -61,56 +62,32 @@
connection.setSchema(schema);
}
}
@Override
- public void dropTableIfExists(String tableName) throws SQLException
+ public void dropTableIfExists(TableIdentifier table) throws SQLException
{
- if (tableExists(tableName)) {
- dropTable(tableName);
+ if (tableExists(table)) {
+ dropTable(table);
}
}
@Override
- protected void dropTableIfExists(Statement stmt, String tableName) throws SQLException {
- if (tableExists(tableName)) {
- dropTable(stmt, tableName);
+ protected void dropTableIfExists(Statement stmt, TableIdentifier table) throws SQLException {
+ if (tableExists(table)) {
+ dropTable(stmt, table);
}
}
@Override
- public void createTableIfNotExists(String tableName, JdbcSchema schema) throws SQLException
+ public void createTableIfNotExists(TableIdentifier table, JdbcSchema schema) throws SQLException
{
- if (!tableExists(tableName)) {
- createTable(tableName, schema);
+ if (!tableExists(table)) {
+ createTable(table, schema);
}
}
- public void createTable(String tableName, JdbcSchema schema) throws SQLException
- {
- Statement stmt = connection.createStatement();
- try {
- String sql = buildCreateTableSql(tableName, schema);
- executeUpdate(stmt, sql);
- commitIfNecessary(connection);
- } catch (SQLException ex) {
- throw safeRollback(connection, ex);
- } finally {
- stmt.close();
- }
- }
-
- protected String buildCreateTableSql(String name, JdbcSchema schema)
- {
- StringBuilder sb = new StringBuilder();
-
- sb.append("CREATE TABLE ");
- quoteIdentifierString(sb, name);
- sb.append(buildCreateTableSchemaSql(schema));
- return sb.toString();
- }
-
private static String getSchema(Connection connection) throws SQLException
{
// Because old Oracle JDBC drivers don't support Connection#getSchema method.
String sql = "SELECT SYS_CONTEXT('USERENV', 'CURRENT_SCHEMA') FROM DUAL";
try (Statement statement = connection.createStatement()) {
@@ -122,12 +99,24 @@
}
}
}
@Override
- protected String buildPreparedInsertSql(String toTable, JdbcSchema toTableSchema) throws SQLException
+ protected String buildRenameTableSql(TableIdentifier fromTable, TableIdentifier toTable)
{
+ // ALTER TABLE doesn't support schema
+ StringBuilder sb = new StringBuilder();
+ sb.append("ALTER TABLE ");
+ quoteIdentifierString(sb, fromTable.getTableName());
+ sb.append(" RENAME TO ");
+ quoteIdentifierString(sb, toTable.getTableName());
+ return sb.toString();
+ }
+
+ @Override
+ protected String buildPreparedInsertSql(TableIdentifier toTable, JdbcSchema toTableSchema) throws SQLException
+ {
String sql = super.buildPreparedInsertSql(toTable, toTableSchema);
if (direct) {
sql = sql.replaceAll("^INSERT ", "INSERT /*+ APPEND_VALUES */ ");
}
return sql;
@@ -197,23 +186,23 @@
}
return super.getColumnDeclareType(convertedTypeName, col);
}
@Override
- protected String buildCollectMergeSql(List<String> fromTables, JdbcSchema schema, String toTable, MergeConfig mergeConfig) throws SQLException
+ protected String buildCollectMergeSql(List<TableIdentifier> fromTables, JdbcSchema schema, TableIdentifier toTable, MergeConfig mergeConfig) throws SQLException
{
StringBuilder sb = new StringBuilder();
sb.append("MERGE INTO ");
- sb.append(quoteIdentifierString(toTable));
+ sb.append(quoteTableIdentifier(toTable));
sb.append(" T");
sb.append(" USING (");
for (int i = 0; i < fromTables.size(); i++) {
if (i != 0) { sb.append(" UNION ALL "); }
sb.append(" SELECT ");
sb.append(buildColumns(schema, ""));
sb.append(" FROM ");
- sb.append(quoteIdentifierString(fromTables.get(i)));
+ sb.append(quoteTableIdentifier(fromTables.get(i)));
}
sb.append(") S");
sb.append(" ON (");
for (int i = 0; i < mergeConfig.getMergeKeys().size(); i++) {
if (i != 0) { sb.append(" AND "); }