src/main/java/org/embulk/output/sqlserver/SQLServerOutputConnection.java in embulk-output-sqlserver-0.7.8 vs src/main/java/org/embulk/output/sqlserver/SQLServerOutputConnection.java in embulk-output-sqlserver-0.7.9
- old
+ new
@@ -8,10 +8,11 @@
import org.embulk.output.jdbc.JdbcColumn;
import org.embulk.output.jdbc.JdbcOutputConnection;
import org.embulk.output.jdbc.JdbcSchema;
import org.embulk.output.jdbc.MergeConfig;
+import org.embulk.output.jdbc.TableIdentifier;
public class SQLServerOutputConnection
extends JdbcOutputConnection
{
public SQLServerOutputConnection(Connection connection, String schemaName, boolean autoCommit)
@@ -20,17 +21,22 @@
super(connection, schemaName);
connection.setAutoCommit(autoCommit);
}
@Override
- protected String buildRenameTableSql(String fromTable, String toTable)
+ protected String buildRenameTableSql(TableIdentifier fromTable, TableIdentifier toTable)
{
+ // sp_rename cannot change schema of table
StringBuilder sb = new StringBuilder();
sb.append("EXEC sp_rename ");
- sb.append(quoteIdentifierString(fromTable));
+ if (fromTable.getSchemaName() == null) {
+ sb.append(quoteIdentifierString(fromTable.getTableName()));
+ } else {
+ sb.append(quoteIdentifierString(fromTable.getSchemaName() + "." + fromTable.getTableName()));
+ }
sb.append(", ");
- sb.append(quoteIdentifierString(toTable));
+ sb.append(quoteIdentifierString(toTable.getTableName()));
sb.append(", 'OBJECT'");
return sb.toString();
}
@Override
@@ -53,57 +59,33 @@
{
// NOP
}
@Override
- public void dropTableIfExists(String tableName) throws SQLException
+ public void dropTableIfExists(TableIdentifier table) throws SQLException
{
- if (tableExists(tableName)) {
- dropTable(tableName);
+ if (tableExists(table)) {
+ dropTable(table);
}
}
@Override
- protected void dropTableIfExists(Statement stmt, String tableName) throws SQLException
+ protected void dropTableIfExists(Statement stmt, TableIdentifier table) throws SQLException
{
- if (tableExists(tableName)) {
- dropTable(stmt, tableName);
+ if (tableExists(table)) {
+ dropTable(stmt, table);
}
}
@Override
- public void createTableIfNotExists(String tableName, JdbcSchema schema) throws SQLException
+ public void createTableIfNotExists(TableIdentifier table, JdbcSchema schema) throws SQLException
{
- if (!tableExists(tableName)) {
- createTable(tableName, schema);
+ if (!tableExists(table)) {
+ createTable(table, schema);
}
}
- public void createTable(String tableName, JdbcSchema schema) throws SQLException
- {
- Statement stmt = connection.createStatement();
- try {
- String sql = buildCreateTableSql(tableName, schema);
- executeUpdate(stmt, sql);
- commitIfNecessary(connection);
- } catch (SQLException ex) {
- throw safeRollback(connection, ex);
- } finally {
- stmt.close();
- }
- }
-
- protected String buildCreateTableSql(String name, JdbcSchema schema)
- {
- StringBuilder sb = new StringBuilder();
-
- sb.append("CREATE TABLE ");
- quoteIdentifierString(sb, name);
- sb.append(buildCreateTableSchemaSql(schema));
- return sb.toString();
- }
-
private static final String[] SIMPLE_TYPE_NAMES = {
"BIT", "FLOAT",
};
@Override
@@ -114,23 +96,23 @@
}
return super.getColumnDeclareType(convertedTypeName, col);
}
@Override
- protected String buildCollectMergeSql(List<String> fromTables, JdbcSchema schema, String toTable, MergeConfig mergeConfig) throws SQLException
+ protected String buildCollectMergeSql(List<TableIdentifier> fromTables, JdbcSchema schema, TableIdentifier toTable, MergeConfig mergeConfig) throws SQLException
{
StringBuilder sb = new StringBuilder();
sb.append("MERGE INTO ");
- sb.append(quoteIdentifierString(toTable));
+ sb.append(quoteTableIdentifier(toTable));
sb.append(" AS T");
sb.append(" USING (");
for (int i = 0; i < fromTables.size(); i++) {
if (i != 0) { sb.append(" UNION ALL "); }
sb.append(" SELECT ");
sb.append(buildColumns(schema, ""));
sb.append(" FROM ");
- sb.append(quoteIdentifierString(fromTables.get(i)));
+ sb.append(quoteTableIdentifier(fromTables.get(i)));
}
sb.append(") AS S");
sb.append(" ON (");
for (int i = 0; i < mergeConfig.getMergeKeys().size(); i++) {
if (i != 0) { sb.append(" AND "); }