src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java in embulk-output-jdbc-0.2.0 vs src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java in embulk-output-jdbc-0.2.1
- old
+ new
@@ -2,12 +2,14 @@
import java.util.List;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+
import org.slf4j.Logger;
import org.embulk.spi.Exec;
public class JdbcOutputConnection
implements AutoCloseable
@@ -56,24 +58,55 @@
} finally {
stmt.close();
}
}
+ public boolean tableExists(String tableName) throws SQLException
+ {
+ try (ResultSet rs = connection.getMetaData().getTables(null, schemaName, tableName, null)) {
+ return rs.next();
+ }
+ }
+
public void dropTableIfExists(String tableName) throws SQLException
{
Statement stmt = connection.createStatement();
try {
- String sql = String.format("DROP TABLE IF EXISTS %s", quoteIdentifierString(tableName));
- executeUpdate(stmt, sql);
+ dropTableIfExists(stmt, tableName);
commitIfNecessary(connection);
} catch (SQLException ex) {
throw safeRollback(connection, ex);
} finally {
stmt.close();
}
}
+ protected void dropTableIfExists(Statement stmt, String tableName) throws SQLException
+ {
+ String sql = String.format("DROP TABLE IF EXISTS %s", quoteIdentifierString(tableName));
+ executeUpdate(stmt, sql);
+ }
+
+ public void dropTable(String tableName) throws SQLException
+ {
+ Statement stmt = connection.createStatement();
+ try {
+ dropTable(stmt, tableName);
+ commitIfNecessary(connection);
+ } catch (SQLException ex) {
+ throw safeRollback(connection, ex);
+ } finally {
+ stmt.close();
+ }
+ }
+
+ protected void dropTable(Statement stmt, String tableName) throws SQLException
+ {
+ String sql = String.format("DROP TABLE %s", quoteIdentifierString(tableName));
+ executeUpdate(stmt, sql);
+ }
+
public void createTableIfNotExists(String tableName, JdbcSchema schema) throws SQLException
{
Statement stmt = connection.createStatement();
try {
String sql = buildCreateTableIfNotExistsSql(tableName, schema);
@@ -90,10 +123,42 @@
{
StringBuilder sb = new StringBuilder();
sb.append("CREATE TABLE IF NOT EXISTS ");
quoteIdentifierString(sb, name);
+ sb.append(buildColumnsOfCreateTableSql(schema));
+ return sb.toString();
+ }
+
+ public void createTable(String tableName, JdbcSchema schema) throws SQLException
+ {
+ Statement stmt = connection.createStatement();
+ try {
+ String sql = buildCreateTableSql(tableName, schema);
+ executeUpdate(stmt, sql);
+ commitIfNecessary(connection);
+ } catch (SQLException ex) {
+ throw safeRollback(connection, ex);
+ } finally {
+ stmt.close();
+ }
+ }
+
+ protected String buildCreateTableSql(String name, JdbcSchema schema)
+ {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("CREATE TABLE ");
+ quoteIdentifierString(sb, name);
+ sb.append(buildColumnsOfCreateTableSql(schema));
+ return sb.toString();
+ }
+
+ private String buildColumnsOfCreateTableSql(JdbcSchema schema)
+ {
+ StringBuilder sb = new StringBuilder();
+
sb.append(" (");
boolean first = true;
for (JdbcColumn c : schema.getColumns()) {
if (first) { first = false; }
else { sb.append(", "); }
@@ -285,26 +350,18 @@
public void replaceTable(String fromTable, JdbcSchema schema, String toTable) throws SQLException
{
Statement stmt = connection.createStatement();
try {
- {
- StringBuilder sb = new StringBuilder();
- sb.append("DROP TABLE IF EXISTS ");
- quoteIdentifierString(sb, toTable);
- String sql = sb.toString();
- executeUpdate(stmt, sql);
- }
+ dropTableIfExists(stmt, toTable);
- {
- StringBuilder sb = new StringBuilder();
- sb.append("ALTER TABLE ");
- quoteIdentifierString(sb, fromTable);
- sb.append(" RENAME TO ");
- quoteIdentifierString(sb, toTable);
- String sql = sb.toString();
- executeUpdate(stmt, sql);
- }
+ StringBuilder sb = new StringBuilder();
+ sb.append("ALTER TABLE ");
+ quoteIdentifierString(sb, fromTable);
+ sb.append(" RENAME TO ");
+ quoteIdentifierString(sb, toTable);
+ String sql = sb.toString();
+ executeUpdate(stmt, sql);
commitIfNecessary(connection);
} catch (SQLException ex) {
throw safeRollback(connection, ex);
} finally {