src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java in embulk-output-jdbc-0.7.8 vs src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java in embulk-output-jdbc-0.7.9
- old
+ new
@@ -9,11 +9,13 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.slf4j.Logger;
+
import com.google.common.base.Optional;
+
import org.embulk.spi.Exec;
public class JdbcOutputConnection
implements AutoCloseable
{
@@ -68,79 +70,108 @@
} finally {
stmt.close();
}
}
- public boolean tableExists(String tableName) throws SQLException
+ public boolean tableExists(TableIdentifier table) throws SQLException
{
- try (ResultSet rs = connection.getMetaData().getTables(null, schemaName, tableName, null)) {
+ try (ResultSet rs = connection.getMetaData().getTables(table.getDatabase(), table.getSchemaName(), table.getTableName(), null)) {
return rs.next();
}
}
- public void dropTableIfExists(String tableName) throws SQLException
+ public boolean tableExists(String tableName) throws SQLException
{
+ return tableExists(new TableIdentifier(null, schemaName, tableName));
+ }
+
+ public void dropTableIfExists(TableIdentifier table) throws SQLException
+ {
Statement stmt = connection.createStatement();
try {
- dropTableIfExists(stmt, tableName);
+ dropTableIfExists(stmt, table);
commitIfNecessary(connection);
} catch (SQLException ex) {
throw safeRollback(connection, ex);
} finally {
stmt.close();
}
}
- protected void dropTableIfExists(Statement stmt, String tableName) throws SQLException
+ protected void dropTableIfExists(Statement stmt, TableIdentifier table) throws SQLException
{
- String sql = String.format("DROP TABLE IF EXISTS %s", quoteIdentifierString(tableName));
+ String sql = String.format("DROP TABLE IF EXISTS %s", quoteTableIdentifier(table));
executeUpdate(stmt, sql);
}
- public void dropTable(String tableName) throws SQLException
+ public void dropTable(TableIdentifier table) throws SQLException
{
Statement stmt = connection.createStatement();
try {
- dropTable(stmt, tableName);
+ dropTable(stmt, table);
commitIfNecessary(connection);
} catch (SQLException ex) {
throw safeRollback(connection, ex);
} finally {
stmt.close();
}
}
- protected void dropTable(Statement stmt, String tableName) throws SQLException
+ protected void dropTable(Statement stmt, TableIdentifier table) throws SQLException
{
- String sql = String.format("DROP TABLE %s", quoteIdentifierString(tableName));
+ String sql = String.format("DROP TABLE %s", quoteTableIdentifier(table));
executeUpdate(stmt, sql);
}
- public void createTableIfNotExists(String tableName, JdbcSchema schema) throws SQLException
+ public void createTableIfNotExists(TableIdentifier targetTable, JdbcSchema schema) throws SQLException
{
Statement stmt = connection.createStatement();
try {
- String sql = buildCreateTableIfNotExistsSql(tableName, schema);
+ String sql = buildCreateTableIfNotExistsSql(targetTable, schema);
executeUpdate(stmt, sql);
commitIfNecessary(connection);
} catch (SQLException ex) {
throw safeRollback(connection, ex);
} finally {
stmt.close();
}
}
- protected String buildCreateTableIfNotExistsSql(String name, JdbcSchema schema)
+ protected String buildCreateTableIfNotExistsSql(TableIdentifier table, JdbcSchema schema)
{
StringBuilder sb = new StringBuilder();
sb.append("CREATE TABLE IF NOT EXISTS ");
- quoteIdentifierString(sb, name);
+ quoteTableIdentifier(sb, table);
sb.append(buildCreateTableSchemaSql(schema));
return sb.toString();
}
+ public void createTable(TableIdentifier table, JdbcSchema schema) throws SQLException
+ {
+ Statement stmt = connection.createStatement();
+ try {
+ String sql = buildCreateTableSql(table, schema);
+ executeUpdate(stmt, sql);
+ commitIfNecessary(connection);
+ } catch (SQLException ex) {
+ throw safeRollback(connection, ex);
+ } finally {
+ stmt.close();
+ }
+ }
+
+ protected String buildCreateTableSql(TableIdentifier table, JdbcSchema schema)
+ {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("CREATE TABLE ");
+ quoteTableIdentifier(sb, table);
+ sb.append(buildCreateTableSchemaSql(schema));
+ return sb.toString();
+ }
+
protected String buildCreateTableSchemaSql(JdbcSchema schema)
{
StringBuilder sb = new StringBuilder();
sb.append(" (");
@@ -154,17 +185,17 @@
sb.append(")");
return sb.toString();
}
- protected String buildRenameTableSql(String fromTable, String toTable)
+ protected String buildRenameTableSql(TableIdentifier fromTable, TableIdentifier toTable)
{
StringBuilder sb = new StringBuilder();
sb.append("ALTER TABLE ");
- quoteIdentifierString(sb, fromTable);
+ quoteTableIdentifier(sb, fromTable);
sb.append(" RENAME TO ");
- quoteIdentifierString(sb, toTable);
+ quoteTableIdentifier(sb, toTable);
return sb.toString();
}
public static enum ColumnDeclareType
{
@@ -239,11 +270,11 @@
}
return ColumnDeclareType.SIMPLE;
}
- public PreparedStatement prepareBatchInsertStatement(String toTable, JdbcSchema toTableSchema, Optional<MergeConfig> mergeConfig) throws SQLException
+ public PreparedStatement prepareBatchInsertStatement(TableIdentifier toTable, JdbcSchema toTableSchema, Optional<MergeConfig> mergeConfig) throws SQLException
{
String sql;
if (mergeConfig.isPresent()) {
sql = buildPreparedMergeSql(toTable, toTableSchema, mergeConfig.get());
} else {
@@ -251,16 +282,16 @@
}
logger.info("Prepared SQL: {}", sql);
return connection.prepareStatement(sql);
}
- protected String buildPreparedInsertSql(String toTable, JdbcSchema toTableSchema) throws SQLException
+ protected String buildPreparedInsertSql(TableIdentifier toTable, JdbcSchema toTableSchema) throws SQLException
{
StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO ");
- quoteIdentifierString(sb, toTable);
+ quoteTableIdentifier(sb, toTable);
sb.append(" (");
for (int i=0; i < toTableSchema.getCount(); i++) {
if(i != 0) { sb.append(", "); }
quoteIdentifierString(sb, toTableSchema.getColumnName(i));
@@ -273,11 +304,11 @@
sb.append(")");
return sb.toString();
}
- protected String buildPreparedMergeSql(String toTable, JdbcSchema toTableSchema, MergeConfig mergeConfig) throws SQLException
+ protected String buildPreparedMergeSql(TableIdentifier toTable, JdbcSchema toTableSchema, MergeConfig mergeConfig) throws SQLException
{
throw new UnsupportedOperationException("not implemented");
}
protected void executeSql(String sql) throws SQLException
@@ -291,11 +322,11 @@
} finally {
stmt.close();
}
}
- protected void collectInsert(List<String> fromTables, JdbcSchema schema, String toTable,
+ protected void collectInsert(List<TableIdentifier> fromTables, JdbcSchema schema, TableIdentifier toTable,
boolean truncateDestinationFirst, Optional<String> preSql, Optional<String> postSql) throws SQLException
{
if (fromTables.isEmpty()) {
return;
}
@@ -324,26 +355,26 @@
} finally {
stmt.close();
}
}
- protected String buildTruncateSql(String table)
+ protected String buildTruncateSql(TableIdentifier table)
{
StringBuilder sb = new StringBuilder();
sb.append("DELETE FROM ");
- quoteIdentifierString(sb, table);
+ quoteTableIdentifier(sb, table);
return sb.toString();
}
- protected String buildCollectInsertSql(List<String> fromTables, JdbcSchema schema, String toTable)
+ protected String buildCollectInsertSql(List<TableIdentifier> fromTables, JdbcSchema schema, TableIdentifier toTable)
{
StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO ");
- quoteIdentifierString(sb, toTable);
+ quoteTableIdentifier(sb, toTable);
sb.append(" (");
for (int i=0; i < schema.getCount(); i++) {
if (i != 0) { sb.append(", "); }
quoteIdentifierString(sb, schema.getColumnName(i));
}
@@ -354,17 +385,17 @@
for (int j=0; j < schema.getCount(); j++) {
if (j != 0) { sb.append(", "); }
quoteIdentifierString(sb, schema.getColumnName(j));
}
sb.append(" FROM ");
- quoteIdentifierString(sb, fromTables.get(i));
+ quoteTableIdentifier(sb, fromTables.get(i));
}
return sb.toString();
}
- protected void collectMerge(List<String> fromTables, JdbcSchema schema, String toTable, MergeConfig mergeConfig,
+ protected void collectMerge(List<TableIdentifier> fromTables, JdbcSchema schema, TableIdentifier toTable, MergeConfig mergeConfig,
Optional<String> preSql, Optional<String> postSql) throws SQLException
{
if (fromTables.isEmpty()) {
return;
}
@@ -388,16 +419,16 @@
} finally {
stmt.close();
}
}
- protected String buildCollectMergeSql(List<String> fromTables, JdbcSchema schema, String toTable, MergeConfig mergeConfig) throws SQLException
+ protected String buildCollectMergeSql(List<TableIdentifier> fromTables, JdbcSchema schema, TableIdentifier toTable, MergeConfig mergeConfig) throws SQLException
{
throw new UnsupportedOperationException("not implemented");
}
- public void replaceTable(String fromTable, JdbcSchema schema, String toTable, Optional<String> postSql) throws SQLException
+ public void replaceTable(TableIdentifier fromTable, JdbcSchema schema, TableIdentifier toTable, Optional<String> postSql) throws SQLException
{
Statement stmt = connection.createStatement();
try {
dropTableIfExists(stmt, toTable);
@@ -413,12 +444,32 @@
} finally {
stmt.close();
}
}
+ protected String quoteTableIdentifier(TableIdentifier table)
+ {
+ StringBuilder sb = new StringBuilder();
+ if (table.getDatabase() != null) {
+ sb.append(quoteIdentifierString(table.getDatabase(), identifierQuoteString));
+ sb.append(".");
+ }
+ if (table.getSchemaName() != null) {
+ sb.append(quoteIdentifierString(table.getSchemaName(), identifierQuoteString));
+ sb.append(".");
+ }
+ sb.append(quoteIdentifierString(table.getTableName(), identifierQuoteString));
+ return sb.toString();
+ }
+
+ protected void quoteTableIdentifier(StringBuilder sb, TableIdentifier table)
+ {
+ sb.append(quoteTableIdentifier(table));
+ }
+
protected void quoteIdentifierString(StringBuilder sb, String str)
{
- sb.append(quoteIdentifierString(str, identifierQuoteString));
+ sb.append(quoteIdentifierString(str));
}
protected String quoteIdentifierString(String str)
{
return quoteIdentifierString(str, identifierQuoteString);