src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java in embulk-output-jdbc-0.7.8 vs src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java in embulk-output-jdbc-0.7.9

- old
+ new

@@ -9,11 +9,13 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import org.slf4j.Logger; + import com.google.common.base.Optional; + import org.embulk.spi.Exec; public class JdbcOutputConnection implements AutoCloseable { @@ -68,79 +70,108 @@ } finally { stmt.close(); } } - public boolean tableExists(String tableName) throws SQLException + public boolean tableExists(TableIdentifier table) throws SQLException { - try (ResultSet rs = connection.getMetaData().getTables(null, schemaName, tableName, null)) { + try (ResultSet rs = connection.getMetaData().getTables(table.getDatabase(), table.getSchemaName(), table.getTableName(), null)) { return rs.next(); } } - public void dropTableIfExists(String tableName) throws SQLException + public boolean tableExists(String tableName) throws SQLException { + return tableExists(new TableIdentifier(null, schemaName, tableName)); + } + + public void dropTableIfExists(TableIdentifier table) throws SQLException + { Statement stmt = connection.createStatement(); try { - dropTableIfExists(stmt, tableName); + dropTableIfExists(stmt, table); commitIfNecessary(connection); } catch (SQLException ex) { throw safeRollback(connection, ex); } finally { stmt.close(); } } - protected void dropTableIfExists(Statement stmt, String tableName) throws SQLException + protected void dropTableIfExists(Statement stmt, TableIdentifier table) throws SQLException { - String sql = String.format("DROP TABLE IF EXISTS %s", quoteIdentifierString(tableName)); + String sql = String.format("DROP TABLE IF EXISTS %s", quoteTableIdentifier(table)); executeUpdate(stmt, sql); } - public void dropTable(String tableName) throws SQLException + public void dropTable(TableIdentifier table) throws SQLException { Statement stmt = connection.createStatement(); try { - dropTable(stmt, tableName); + dropTable(stmt, table); commitIfNecessary(connection); } catch (SQLException ex) { throw safeRollback(connection, ex); } finally { stmt.close(); } } - protected void dropTable(Statement stmt, String tableName) throws SQLException + protected void dropTable(Statement stmt, TableIdentifier table) throws SQLException { - String sql = String.format("DROP TABLE %s", quoteIdentifierString(tableName)); + String sql = String.format("DROP TABLE %s", quoteTableIdentifier(table)); executeUpdate(stmt, sql); } - public void createTableIfNotExists(String tableName, JdbcSchema schema) throws SQLException + public void createTableIfNotExists(TableIdentifier targetTable, JdbcSchema schema) throws SQLException { Statement stmt = connection.createStatement(); try { - String sql = buildCreateTableIfNotExistsSql(tableName, schema); + String sql = buildCreateTableIfNotExistsSql(targetTable, schema); executeUpdate(stmt, sql); commitIfNecessary(connection); } catch (SQLException ex) { throw safeRollback(connection, ex); } finally { stmt.close(); } } - protected String buildCreateTableIfNotExistsSql(String name, JdbcSchema schema) + protected String buildCreateTableIfNotExistsSql(TableIdentifier table, JdbcSchema schema) { StringBuilder sb = new StringBuilder(); sb.append("CREATE TABLE IF NOT EXISTS "); - quoteIdentifierString(sb, name); + quoteTableIdentifier(sb, table); sb.append(buildCreateTableSchemaSql(schema)); return sb.toString(); } + public void createTable(TableIdentifier table, JdbcSchema schema) throws SQLException + { + Statement stmt = connection.createStatement(); + try { + String sql = buildCreateTableSql(table, schema); + executeUpdate(stmt, sql); + commitIfNecessary(connection); + } catch (SQLException ex) { + throw safeRollback(connection, ex); + } finally { + stmt.close(); + } + } + + protected String buildCreateTableSql(TableIdentifier table, JdbcSchema schema) + { + StringBuilder sb = new StringBuilder(); + + sb.append("CREATE TABLE "); + quoteTableIdentifier(sb, table); + sb.append(buildCreateTableSchemaSql(schema)); + return sb.toString(); + } + protected String buildCreateTableSchemaSql(JdbcSchema schema) { StringBuilder sb = new StringBuilder(); sb.append(" ("); @@ -154,17 +185,17 @@ sb.append(")"); return sb.toString(); } - protected String buildRenameTableSql(String fromTable, String toTable) + protected String buildRenameTableSql(TableIdentifier fromTable, TableIdentifier toTable) { StringBuilder sb = new StringBuilder(); sb.append("ALTER TABLE "); - quoteIdentifierString(sb, fromTable); + quoteTableIdentifier(sb, fromTable); sb.append(" RENAME TO "); - quoteIdentifierString(sb, toTable); + quoteTableIdentifier(sb, toTable); return sb.toString(); } public static enum ColumnDeclareType { @@ -239,11 +270,11 @@ } return ColumnDeclareType.SIMPLE; } - public PreparedStatement prepareBatchInsertStatement(String toTable, JdbcSchema toTableSchema, Optional<MergeConfig> mergeConfig) throws SQLException + public PreparedStatement prepareBatchInsertStatement(TableIdentifier toTable, JdbcSchema toTableSchema, Optional<MergeConfig> mergeConfig) throws SQLException { String sql; if (mergeConfig.isPresent()) { sql = buildPreparedMergeSql(toTable, toTableSchema, mergeConfig.get()); } else { @@ -251,16 +282,16 @@ } logger.info("Prepared SQL: {}", sql); return connection.prepareStatement(sql); } - protected String buildPreparedInsertSql(String toTable, JdbcSchema toTableSchema) throws SQLException + protected String buildPreparedInsertSql(TableIdentifier toTable, JdbcSchema toTableSchema) throws SQLException { StringBuilder sb = new StringBuilder(); sb.append("INSERT INTO "); - quoteIdentifierString(sb, toTable); + quoteTableIdentifier(sb, toTable); sb.append(" ("); for (int i=0; i < toTableSchema.getCount(); i++) { if(i != 0) { sb.append(", "); } quoteIdentifierString(sb, toTableSchema.getColumnName(i)); @@ -273,11 +304,11 @@ sb.append(")"); return sb.toString(); } - protected String buildPreparedMergeSql(String toTable, JdbcSchema toTableSchema, MergeConfig mergeConfig) throws SQLException + protected String buildPreparedMergeSql(TableIdentifier toTable, JdbcSchema toTableSchema, MergeConfig mergeConfig) throws SQLException { throw new UnsupportedOperationException("not implemented"); } protected void executeSql(String sql) throws SQLException @@ -291,11 +322,11 @@ } finally { stmt.close(); } } - protected void collectInsert(List<String> fromTables, JdbcSchema schema, String toTable, + protected void collectInsert(List<TableIdentifier> fromTables, JdbcSchema schema, TableIdentifier toTable, boolean truncateDestinationFirst, Optional<String> preSql, Optional<String> postSql) throws SQLException { if (fromTables.isEmpty()) { return; } @@ -324,26 +355,26 @@ } finally { stmt.close(); } } - protected String buildTruncateSql(String table) + protected String buildTruncateSql(TableIdentifier table) { StringBuilder sb = new StringBuilder(); sb.append("DELETE FROM "); - quoteIdentifierString(sb, table); + quoteTableIdentifier(sb, table); return sb.toString(); } - protected String buildCollectInsertSql(List<String> fromTables, JdbcSchema schema, String toTable) + protected String buildCollectInsertSql(List<TableIdentifier> fromTables, JdbcSchema schema, TableIdentifier toTable) { StringBuilder sb = new StringBuilder(); sb.append("INSERT INTO "); - quoteIdentifierString(sb, toTable); + quoteTableIdentifier(sb, toTable); sb.append(" ("); for (int i=0; i < schema.getCount(); i++) { if (i != 0) { sb.append(", "); } quoteIdentifierString(sb, schema.getColumnName(i)); } @@ -354,17 +385,17 @@ for (int j=0; j < schema.getCount(); j++) { if (j != 0) { sb.append(", "); } quoteIdentifierString(sb, schema.getColumnName(j)); } sb.append(" FROM "); - quoteIdentifierString(sb, fromTables.get(i)); + quoteTableIdentifier(sb, fromTables.get(i)); } return sb.toString(); } - protected void collectMerge(List<String> fromTables, JdbcSchema schema, String toTable, MergeConfig mergeConfig, + protected void collectMerge(List<TableIdentifier> fromTables, JdbcSchema schema, TableIdentifier toTable, MergeConfig mergeConfig, Optional<String> preSql, Optional<String> postSql) throws SQLException { if (fromTables.isEmpty()) { return; } @@ -388,16 +419,16 @@ } finally { stmt.close(); } } - protected String buildCollectMergeSql(List<String> fromTables, JdbcSchema schema, String toTable, MergeConfig mergeConfig) throws SQLException + protected String buildCollectMergeSql(List<TableIdentifier> fromTables, JdbcSchema schema, TableIdentifier toTable, MergeConfig mergeConfig) throws SQLException { throw new UnsupportedOperationException("not implemented"); } - public void replaceTable(String fromTable, JdbcSchema schema, String toTable, Optional<String> postSql) throws SQLException + public void replaceTable(TableIdentifier fromTable, JdbcSchema schema, TableIdentifier toTable, Optional<String> postSql) throws SQLException { Statement stmt = connection.createStatement(); try { dropTableIfExists(stmt, toTable); @@ -413,12 +444,32 @@ } finally { stmt.close(); } } + protected String quoteTableIdentifier(TableIdentifier table) + { + StringBuilder sb = new StringBuilder(); + if (table.getDatabase() != null) { + sb.append(quoteIdentifierString(table.getDatabase(), identifierQuoteString)); + sb.append("."); + } + if (table.getSchemaName() != null) { + sb.append(quoteIdentifierString(table.getSchemaName(), identifierQuoteString)); + sb.append("."); + } + sb.append(quoteIdentifierString(table.getTableName(), identifierQuoteString)); + return sb.toString(); + } + + protected void quoteTableIdentifier(StringBuilder sb, TableIdentifier table) + { + sb.append(quoteTableIdentifier(table)); + } + protected void quoteIdentifierString(StringBuilder sb, String str) { - sb.append(quoteIdentifierString(str, identifierQuoteString)); + sb.append(quoteIdentifierString(str)); } protected String quoteIdentifierString(String str) { return quoteIdentifierString(str, identifierQuoteString);