src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java in embulk-output-jdbc-0.7.12 vs src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java in embulk-output-jdbc-0.7.13
- old
+ new
@@ -83,10 +83,15 @@
public boolean tableExists(String tableName) throws SQLException
{
return tableExists(new TableIdentifier(null, schemaName, tableName));
}
+ protected boolean supportsTableIfExistsClause()
+ {
+ return true;
+ }
+
public void dropTableIfExists(TableIdentifier table) throws SQLException
{
Statement stmt = connection.createStatement();
try {
dropTableIfExists(stmt, table);
@@ -98,12 +103,18 @@
}
}
protected void dropTableIfExists(Statement stmt, TableIdentifier table) throws SQLException
{
- String sql = String.format("DROP TABLE IF EXISTS %s", quoteTableIdentifier(table));
- executeUpdate(stmt, sql);
+ if (supportsTableIfExistsClause()) {
+ String sql = String.format("DROP TABLE IF EXISTS %s", quoteTableIdentifier(table));
+ executeUpdate(stmt, sql);
+ } else {
+ if (tableExists(table)) {
+ dropTable(stmt, table);
+ }
+ }
}
public void dropTable(TableIdentifier table) throws SQLException
{
Statement stmt = connection.createStatement();
@@ -121,68 +132,90 @@
{
String sql = String.format("DROP TABLE %s", quoteTableIdentifier(table));
executeUpdate(stmt, sql);
}
- public void createTableIfNotExists(TableIdentifier targetTable, JdbcSchema schema) throws SQLException
+ public void createTableIfNotExists(TableIdentifier table, JdbcSchema schema,
+ Optional<String> tableConstraint, Optional<String> tableOption) throws SQLException
{
- Statement stmt = connection.createStatement();
- try {
- String sql = buildCreateTableIfNotExistsSql(targetTable, schema);
- executeUpdate(stmt, sql);
- commitIfNecessary(connection);
- } catch (SQLException ex) {
- throw safeRollback(connection, ex);
- } finally {
- stmt.close();
+ if (supportsTableIfExistsClause()) {
+ Statement stmt = connection.createStatement();
+ try {
+ String sql = buildCreateTableIfNotExistsSql(table, schema, tableConstraint, tableOption);
+ executeUpdate(stmt, sql);
+ commitIfNecessary(connection);
+ } catch (SQLException ex) {
+ throw safeRollback(connection, ex);
+ } finally {
+ stmt.close();
+ }
+ } else {
+ if (!tableExists(table)) {
+ createTable(table, schema, tableConstraint, tableOption);
+ }
}
}
- protected String buildCreateTableIfNotExistsSql(TableIdentifier table, JdbcSchema schema)
+ protected String buildCreateTableIfNotExistsSql(TableIdentifier table, JdbcSchema schema,
+ Optional<String> tableConstraint, Optional<String> tableOption)
{
StringBuilder sb = new StringBuilder();
sb.append("CREATE TABLE IF NOT EXISTS ");
quoteTableIdentifier(sb, table);
- sb.append(buildCreateTableSchemaSql(schema));
+ sb.append(buildCreateTableSchemaSql(schema, tableConstraint));
+ if (tableOption.isPresent()) {
+ sb.append(" ");
+ sb.append(tableOption.get());
+ }
return sb.toString();
}
- public void createTable(TableIdentifier table, JdbcSchema schema) throws SQLException
+ public void createTable(TableIdentifier table, JdbcSchema schema,
+ Optional<String> tableConstraint, Optional<String> tableOption) throws SQLException
{
Statement stmt = connection.createStatement();
try {
- String sql = buildCreateTableSql(table, schema);
+ String sql = buildCreateTableSql(table, schema, tableConstraint, tableOption);
executeUpdate(stmt, sql);
commitIfNecessary(connection);
} catch (SQLException ex) {
throw safeRollback(connection, ex);
} finally {
stmt.close();
}
}
- protected String buildCreateTableSql(TableIdentifier table, JdbcSchema schema)
+ protected String buildCreateTableSql(TableIdentifier table, JdbcSchema schema,
+ Optional<String> tableConstraint, Optional<String> tableOption)
{
StringBuilder sb = new StringBuilder();
sb.append("CREATE TABLE ");
quoteTableIdentifier(sb, table);
- sb.append(buildCreateTableSchemaSql(schema));
+ sb.append(buildCreateTableSchemaSql(schema, tableConstraint));
+ if (tableOption.isPresent()) {
+ sb.append(" ");
+ sb.append(tableOption.get());
+ }
return sb.toString();
}
- protected String buildCreateTableSchemaSql(JdbcSchema schema)
+ protected String buildCreateTableSchemaSql(JdbcSchema schema, Optional<String> tableConstraint)
{
StringBuilder sb = new StringBuilder();
sb.append(" (");
- for (int i=0; i < schema.getCount(); i++) {
+ for (int i = 0; i < schema.getCount(); i++) {
if (i != 0) { sb.append(", "); }
quoteIdentifierString(sb, schema.getColumnName(i));
sb.append(" ");
String typeName = getCreateTableTypeName(schema.getColumn(i));
sb.append(typeName);
+ }
+ if (tableConstraint.isPresent()) {
+ sb.append(", ");
+ sb.append(tableConstraint.get());
}
sb.append(")");
return sb.toString();
}