src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java in embulk-output-jdbc-0.2.0 vs src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java in embulk-output-jdbc-0.2.1

- old
+ new

@@ -2,12 +2,14 @@ import java.util.List; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; + import org.slf4j.Logger; import org.embulk.spi.Exec; public class JdbcOutputConnection implements AutoCloseable @@ -56,24 +58,55 @@ } finally { stmt.close(); } } + public boolean tableExists(String tableName) throws SQLException + { + try (ResultSet rs = connection.getMetaData().getTables(null, schemaName, tableName, null)) { + return rs.next(); + } + } + public void dropTableIfExists(String tableName) throws SQLException { Statement stmt = connection.createStatement(); try { - String sql = String.format("DROP TABLE IF EXISTS %s", quoteIdentifierString(tableName)); - executeUpdate(stmt, sql); + dropTableIfExists(stmt, tableName); commitIfNecessary(connection); } catch (SQLException ex) { throw safeRollback(connection, ex); } finally { stmt.close(); } } + protected void dropTableIfExists(Statement stmt, String tableName) throws SQLException + { + String sql = String.format("DROP TABLE IF EXISTS %s", quoteIdentifierString(tableName)); + executeUpdate(stmt, sql); + } + + public void dropTable(String tableName) throws SQLException + { + Statement stmt = connection.createStatement(); + try { + dropTable(stmt, tableName); + commitIfNecessary(connection); + } catch (SQLException ex) { + throw safeRollback(connection, ex); + } finally { + stmt.close(); + } + } + + protected void dropTable(Statement stmt, String tableName) throws SQLException + { + String sql = String.format("DROP TABLE %s", quoteIdentifierString(tableName)); + executeUpdate(stmt, sql); + } + public void createTableIfNotExists(String tableName, JdbcSchema schema) throws SQLException { Statement stmt = connection.createStatement(); try { String sql = buildCreateTableIfNotExistsSql(tableName, schema); @@ -90,10 +123,42 @@ { StringBuilder sb = new StringBuilder(); sb.append("CREATE TABLE IF NOT EXISTS "); quoteIdentifierString(sb, name); + sb.append(buildColumnsOfCreateTableSql(schema)); + return sb.toString(); + } + + public void createTable(String tableName, JdbcSchema schema) throws SQLException + { + Statement stmt = connection.createStatement(); + try { + String sql = buildCreateTableSql(tableName, schema); + executeUpdate(stmt, sql); + commitIfNecessary(connection); + } catch (SQLException ex) { + throw safeRollback(connection, ex); + } finally { + stmt.close(); + } + } + + protected String buildCreateTableSql(String name, JdbcSchema schema) + { + StringBuilder sb = new StringBuilder(); + + sb.append("CREATE TABLE "); + quoteIdentifierString(sb, name); + sb.append(buildColumnsOfCreateTableSql(schema)); + return sb.toString(); + } + + private String buildColumnsOfCreateTableSql(JdbcSchema schema) + { + StringBuilder sb = new StringBuilder(); + sb.append(" ("); boolean first = true; for (JdbcColumn c : schema.getColumns()) { if (first) { first = false; } else { sb.append(", "); } @@ -285,26 +350,18 @@ public void replaceTable(String fromTable, JdbcSchema schema, String toTable) throws SQLException { Statement stmt = connection.createStatement(); try { - { - StringBuilder sb = new StringBuilder(); - sb.append("DROP TABLE IF EXISTS "); - quoteIdentifierString(sb, toTable); - String sql = sb.toString(); - executeUpdate(stmt, sql); - } + dropTableIfExists(stmt, toTable); - { - StringBuilder sb = new StringBuilder(); - sb.append("ALTER TABLE "); - quoteIdentifierString(sb, fromTable); - sb.append(" RENAME TO "); - quoteIdentifierString(sb, toTable); - String sql = sb.toString(); - executeUpdate(stmt, sql); - } + StringBuilder sb = new StringBuilder(); + sb.append("ALTER TABLE "); + quoteIdentifierString(sb, fromTable); + sb.append(" RENAME TO "); + quoteIdentifierString(sb, toTable); + String sql = sb.toString(); + executeUpdate(stmt, sql); commitIfNecessary(connection); } catch (SQLException ex) { throw safeRollback(connection, ex); } finally {