src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java in embulk-output-jdbc-0.7.12 vs src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java in embulk-output-jdbc-0.7.13

- old
+ new

@@ -83,10 +83,15 @@ public boolean tableExists(String tableName) throws SQLException { return tableExists(new TableIdentifier(null, schemaName, tableName)); } + protected boolean supportsTableIfExistsClause() + { + return true; + } + public void dropTableIfExists(TableIdentifier table) throws SQLException { Statement stmt = connection.createStatement(); try { dropTableIfExists(stmt, table); @@ -98,12 +103,18 @@ } } protected void dropTableIfExists(Statement stmt, TableIdentifier table) throws SQLException { - String sql = String.format("DROP TABLE IF EXISTS %s", quoteTableIdentifier(table)); - executeUpdate(stmt, sql); + if (supportsTableIfExistsClause()) { + String sql = String.format("DROP TABLE IF EXISTS %s", quoteTableIdentifier(table)); + executeUpdate(stmt, sql); + } else { + if (tableExists(table)) { + dropTable(stmt, table); + } + } } public void dropTable(TableIdentifier table) throws SQLException { Statement stmt = connection.createStatement(); @@ -121,68 +132,90 @@ { String sql = String.format("DROP TABLE %s", quoteTableIdentifier(table)); executeUpdate(stmt, sql); } - public void createTableIfNotExists(TableIdentifier targetTable, JdbcSchema schema) throws SQLException + public void createTableIfNotExists(TableIdentifier table, JdbcSchema schema, + Optional<String> tableConstraint, Optional<String> tableOption) throws SQLException { - Statement stmt = connection.createStatement(); - try { - String sql = buildCreateTableIfNotExistsSql(targetTable, schema); - executeUpdate(stmt, sql); - commitIfNecessary(connection); - } catch (SQLException ex) { - throw safeRollback(connection, ex); - } finally { - stmt.close(); + if (supportsTableIfExistsClause()) { + Statement stmt = connection.createStatement(); + try { + String sql = buildCreateTableIfNotExistsSql(table, schema, tableConstraint, tableOption); + executeUpdate(stmt, sql); + commitIfNecessary(connection); + } catch (SQLException ex) { + throw safeRollback(connection, ex); + } finally { + stmt.close(); + } + } else { + if (!tableExists(table)) { + createTable(table, schema, tableConstraint, tableOption); + } } } - protected String buildCreateTableIfNotExistsSql(TableIdentifier table, JdbcSchema schema) + protected String buildCreateTableIfNotExistsSql(TableIdentifier table, JdbcSchema schema, + Optional<String> tableConstraint, Optional<String> tableOption) { StringBuilder sb = new StringBuilder(); sb.append("CREATE TABLE IF NOT EXISTS "); quoteTableIdentifier(sb, table); - sb.append(buildCreateTableSchemaSql(schema)); + sb.append(buildCreateTableSchemaSql(schema, tableConstraint)); + if (tableOption.isPresent()) { + sb.append(" "); + sb.append(tableOption.get()); + } return sb.toString(); } - public void createTable(TableIdentifier table, JdbcSchema schema) throws SQLException + public void createTable(TableIdentifier table, JdbcSchema schema, + Optional<String> tableConstraint, Optional<String> tableOption) throws SQLException { Statement stmt = connection.createStatement(); try { - String sql = buildCreateTableSql(table, schema); + String sql = buildCreateTableSql(table, schema, tableConstraint, tableOption); executeUpdate(stmt, sql); commitIfNecessary(connection); } catch (SQLException ex) { throw safeRollback(connection, ex); } finally { stmt.close(); } } - protected String buildCreateTableSql(TableIdentifier table, JdbcSchema schema) + protected String buildCreateTableSql(TableIdentifier table, JdbcSchema schema, + Optional<String> tableConstraint, Optional<String> tableOption) { StringBuilder sb = new StringBuilder(); sb.append("CREATE TABLE "); quoteTableIdentifier(sb, table); - sb.append(buildCreateTableSchemaSql(schema)); + sb.append(buildCreateTableSchemaSql(schema, tableConstraint)); + if (tableOption.isPresent()) { + sb.append(" "); + sb.append(tableOption.get()); + } return sb.toString(); } - protected String buildCreateTableSchemaSql(JdbcSchema schema) + protected String buildCreateTableSchemaSql(JdbcSchema schema, Optional<String> tableConstraint) { StringBuilder sb = new StringBuilder(); sb.append(" ("); - for (int i=0; i < schema.getCount(); i++) { + for (int i = 0; i < schema.getCount(); i++) { if (i != 0) { sb.append(", "); } quoteIdentifierString(sb, schema.getColumnName(i)); sb.append(" "); String typeName = getCreateTableTypeName(schema.getColumn(i)); sb.append(typeName); + } + if (tableConstraint.isPresent()) { + sb.append(", "); + sb.append(tableConstraint.get()); } sb.append(")"); return sb.toString(); }