src/main/java/org/embulk/output/sqlserver/SQLServerOutputConnection.java in embulk-output-sqlserver-0.7.8 vs src/main/java/org/embulk/output/sqlserver/SQLServerOutputConnection.java in embulk-output-sqlserver-0.7.9

- old
+ new

@@ -8,10 +8,11 @@ import org.embulk.output.jdbc.JdbcColumn; import org.embulk.output.jdbc.JdbcOutputConnection; import org.embulk.output.jdbc.JdbcSchema; import org.embulk.output.jdbc.MergeConfig; +import org.embulk.output.jdbc.TableIdentifier; public class SQLServerOutputConnection extends JdbcOutputConnection { public SQLServerOutputConnection(Connection connection, String schemaName, boolean autoCommit) @@ -20,17 +21,22 @@ super(connection, schemaName); connection.setAutoCommit(autoCommit); } @Override - protected String buildRenameTableSql(String fromTable, String toTable) + protected String buildRenameTableSql(TableIdentifier fromTable, TableIdentifier toTable) { + // sp_rename cannot change schema of table StringBuilder sb = new StringBuilder(); sb.append("EXEC sp_rename "); - sb.append(quoteIdentifierString(fromTable)); + if (fromTable.getSchemaName() == null) { + sb.append(quoteIdentifierString(fromTable.getTableName())); + } else { + sb.append(quoteIdentifierString(fromTable.getSchemaName() + "." + fromTable.getTableName())); + } sb.append(", "); - sb.append(quoteIdentifierString(toTable)); + sb.append(quoteIdentifierString(toTable.getTableName())); sb.append(", 'OBJECT'"); return sb.toString(); } @Override @@ -53,57 +59,33 @@ { // NOP } @Override - public void dropTableIfExists(String tableName) throws SQLException + public void dropTableIfExists(TableIdentifier table) throws SQLException { - if (tableExists(tableName)) { - dropTable(tableName); + if (tableExists(table)) { + dropTable(table); } } @Override - protected void dropTableIfExists(Statement stmt, String tableName) throws SQLException + protected void dropTableIfExists(Statement stmt, TableIdentifier table) throws SQLException { - if (tableExists(tableName)) { - dropTable(stmt, tableName); + if (tableExists(table)) { + dropTable(stmt, table); } } @Override - public void createTableIfNotExists(String tableName, JdbcSchema schema) throws SQLException + public void createTableIfNotExists(TableIdentifier table, JdbcSchema schema) throws SQLException { - if (!tableExists(tableName)) { - createTable(tableName, schema); + if (!tableExists(table)) { + createTable(table, schema); } } - public void createTable(String tableName, JdbcSchema schema) throws SQLException - { - Statement stmt = connection.createStatement(); - try { - String sql = buildCreateTableSql(tableName, schema); - executeUpdate(stmt, sql); - commitIfNecessary(connection); - } catch (SQLException ex) { - throw safeRollback(connection, ex); - } finally { - stmt.close(); - } - } - - protected String buildCreateTableSql(String name, JdbcSchema schema) - { - StringBuilder sb = new StringBuilder(); - - sb.append("CREATE TABLE "); - quoteIdentifierString(sb, name); - sb.append(buildCreateTableSchemaSql(schema)); - return sb.toString(); - } - private static final String[] SIMPLE_TYPE_NAMES = { "BIT", "FLOAT", }; @Override @@ -114,23 +96,23 @@ } return super.getColumnDeclareType(convertedTypeName, col); } @Override - protected String buildCollectMergeSql(List<String> fromTables, JdbcSchema schema, String toTable, MergeConfig mergeConfig) throws SQLException + protected String buildCollectMergeSql(List<TableIdentifier> fromTables, JdbcSchema schema, TableIdentifier toTable, MergeConfig mergeConfig) throws SQLException { StringBuilder sb = new StringBuilder(); sb.append("MERGE INTO "); - sb.append(quoteIdentifierString(toTable)); + sb.append(quoteTableIdentifier(toTable)); sb.append(" AS T"); sb.append(" USING ("); for (int i = 0; i < fromTables.size(); i++) { if (i != 0) { sb.append(" UNION ALL "); } sb.append(" SELECT "); sb.append(buildColumns(schema, "")); sb.append(" FROM "); - sb.append(quoteIdentifierString(fromTables.get(i))); + sb.append(quoteTableIdentifier(fromTables.get(i))); } sb.append(") AS S"); sb.append(" ON ("); for (int i = 0; i < mergeConfig.getMergeKeys().size(); i++) { if (i != 0) { sb.append(" AND "); }