src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java in embulk-output-jdbc-0.2.4 vs src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java in embulk-output-jdbc-0.3.0

- old
+ new

@@ -1,15 +1,18 @@ package org.embulk.output.jdbc; +import java.util.List; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; - import org.slf4j.Logger; +import com.google.common.base.Optional; import org.embulk.spi.Exec; public class JdbcOutputConnection implements AutoCloseable { @@ -45,10 +48,15 @@ public DatabaseMetaData getMetaData() throws SQLException { return databaseMetaData; } + public Charset getTableNameCharset() throws SQLException + { + return StandardCharsets.UTF_8; + } + protected void setSearchPath(String schema) throws SQLException { Statement stmt = connection.createStatement(); try { String sql = "SET search_path TO " + quoteIdentifierString(schema); @@ -122,50 +130,24 @@ { StringBuilder sb = new StringBuilder(); sb.append("CREATE TABLE IF NOT EXISTS "); quoteIdentifierString(sb, name); - sb.append(buildColumnsOfCreateTableSql(schema)); + sb.append(buildCreateTableSchemaSql(schema)); return sb.toString(); } - public void createTable(String tableName, JdbcSchema schema) throws SQLException + protected String buildCreateTableSchemaSql(JdbcSchema schema) { - Statement stmt = connection.createStatement(); - try { - String sql = buildCreateTableSql(tableName, schema); - executeUpdate(stmt, sql); - commitIfNecessary(connection); - } catch (SQLException ex) { - throw safeRollback(connection, ex); - } finally { - stmt.close(); - } - } - - protected String buildCreateTableSql(String name, JdbcSchema schema) - { StringBuilder sb = new StringBuilder(); - sb.append("CREATE TABLE "); - quoteIdentifierString(sb, name); - sb.append(buildColumnsOfCreateTableSql(schema)); - return sb.toString(); - } - - private String buildColumnsOfCreateTableSql(JdbcSchema schema) - { - StringBuilder sb = new StringBuilder(); - sb.append(" ("); - boolean first = true; - for (JdbcColumn c : schema.getColumns()) { - if (first) { first = false; } - else { sb.append(", "); } - quoteIdentifierString(sb, c.getName()); + for (int i=0; i < schema.getCount(); i++) { + if (i != 0) { sb.append(", "); } + quoteIdentifierString(sb, schema.getColumnName(i)); sb.append(" "); - String typeName = getCreateTableTypeName(c); + String typeName = getCreateTableTypeName(schema.getColumn(i)); sb.append(typeName); } sb.append(")"); return sb.toString(); @@ -179,37 +161,40 @@ SIZE_AND_OPTIONAL_SCALE, }; protected String getCreateTableTypeName(JdbcColumn c) { - String convertedTypeName = convertTypeName(c.getTypeName()); - switch (getColumnDeclareType(convertedTypeName, c)) { + if (c.getDeclaredType().isPresent()) { + return c.getDeclaredType().get(); + } else { + return buildColumnTypeName(c); + } + } + + protected String buildColumnTypeName(JdbcColumn c) + { + String simpleTypeName = c.getSimpleTypeName(); + switch (getColumnDeclareType(simpleTypeName, c)) { case SIZE: - return String.format("%s(%d)", convertedTypeName, c.getSizeTypeParameter()); + return String.format("%s(%d)", simpleTypeName, c.getSizeTypeParameter()); case SIZE_AND_SCALE: if (c.getScaleTypeParameter() < 0) { - return String.format("%s(%d,0)", convertedTypeName, c.getSizeTypeParameter()); + return String.format("%s(%d,0)", simpleTypeName, c.getSizeTypeParameter()); } else { - return String.format("%s(%d,%d)", convertedTypeName, c.getSizeTypeParameter(), c.getScaleTypeParameter()); + return String.format("%s(%d,%d)", simpleTypeName, c.getSizeTypeParameter(), c.getScaleTypeParameter()); } case SIZE_AND_OPTIONAL_SCALE: if (c.getScaleTypeParameter() < 0) { - return String.format("%s(%d)", convertedTypeName, c.getSizeTypeParameter()); + return String.format("%s(%d)", simpleTypeName, c.getSizeTypeParameter()); } else { - return String.format("%s(%d,%d)", convertedTypeName, c.getSizeTypeParameter(), c.getScaleTypeParameter()); + return String.format("%s(%d,%d)", simpleTypeName, c.getSizeTypeParameter(), c.getScaleTypeParameter()); } default: // SIMPLE - return convertedTypeName; + return simpleTypeName; } } - // hook point for subclasses - protected String convertTypeName(String typeName) - { - return typeName; - } - // TODO private static final String[] STANDARD_SIZE_TYPE_NAMES = new String[] { "CHAR", "VARCHAR", "CHAR VARYING", "CHARACTER VARYING", "LONGVARCHAR", "NCHAR", @@ -240,125 +225,121 @@ } return ColumnDeclareType.SIMPLE; } - protected String buildInsertTableSql(String fromTable, JdbcSchema fromTableSchema, String toTable) + public PreparedStatement prepareBatchInsertStatement(String toTable, JdbcSchema toTableSchema, Optional<List<String>> mergeKeys) throws SQLException { + String sql; + if (mergeKeys.isPresent()) { + sql = buildPreparedMergeSql(toTable, toTableSchema, mergeKeys.get()); + } else { + sql = buildPreparedInsertSql(toTable, toTableSchema); + } + logger.info("Prepared SQL: {}", sql); + return connection.prepareStatement(sql); + } + + protected String buildPreparedInsertSql(String toTable, JdbcSchema toTableSchema) throws SQLException + { StringBuilder sb = new StringBuilder(); sb.append("INSERT INTO "); quoteIdentifierString(sb, toTable); + sb.append(" ("); - boolean first = true; - for (JdbcColumn c : fromTableSchema.getColumns()) { - if (first) { first = false; } - else { sb.append(", "); } - quoteIdentifierString(sb, c.getName()); + for (int i=0; i < toTableSchema.getCount(); i++) { + if(i != 0) { sb.append(", "); } + quoteIdentifierString(sb, toTableSchema.getColumnName(i)); } - sb.append(") "); - sb.append("SELECT "); - for (JdbcColumn c : fromTableSchema.getColumns()) { - if (first) { first = false; } - else { sb.append(", "); } - quoteIdentifierString(sb, c.getName()); + sb.append(") VALUES ("); + for(int i=0; i < toTableSchema.getCount(); i++) { + if(i != 0) { sb.append(", "); } + sb.append("?"); } - sb.append(" FROM "); - quoteIdentifierString(sb, fromTable); + sb.append(")"); return sb.toString(); } - protected String buildTruncateSql(String table) + protected String buildPreparedMergeSql(String toTable, JdbcSchema toTableSchema, List<String> mergeKeys) throws SQLException { - StringBuilder sb = new StringBuilder(); - - sb.append("DELETE FROM "); - quoteIdentifierString(sb, table); - - return sb.toString(); + throw new UnsupportedOperationException("not implemented"); } - protected void insertTable(String fromTable, JdbcSchema fromTableSchema, String toTable, + protected void collectInsert(List<String> fromTables, JdbcSchema schema, String toTable, boolean truncateDestinationFirst) throws SQLException { Statement stmt = connection.createStatement(); try { - if(truncateDestinationFirst) { + if (truncateDestinationFirst) { String sql = buildTruncateSql(toTable); executeUpdate(stmt, sql); } - String sql = buildInsertTableSql(fromTable, fromTableSchema, toTable); + String sql = buildCollectInsertSql(fromTables, schema, toTable); executeUpdate(stmt, sql); commitIfNecessary(connection); } catch (SQLException ex) { - connection.rollback(); - throw ex; + throw safeRollback(connection, ex); } finally { stmt.close(); } } - public PreparedStatement prepareInsertSql(String toTable, JdbcSchema toTableSchema) throws SQLException + protected String buildTruncateSql(String table) { - String insertSql = buildPrepareInsertSql(toTable, toTableSchema); - logger.info("Prepared SQL: {}", insertSql); - return connection.prepareStatement(insertSql); - } + StringBuilder sb = new StringBuilder(); - public PreparedStatement prepareUpsertSql(String toTable, JdbcSchema toTableSchema) throws SQLException - { - String upsertSql = buildPrepareUpsertSql(toTable, toTableSchema); - logger.info("Prepared SQL: {}", upsertSql); - return connection.prepareStatement(upsertSql); + sb.append("DELETE FROM "); + quoteIdentifierString(sb, table); + + return sb.toString(); } - protected String buildPrepareInsertSql(String toTable, JdbcSchema toTableSchema) throws SQLException + protected String buildCollectInsertSql(List<String> fromTables, JdbcSchema schema, String toTable) { StringBuilder sb = new StringBuilder(); sb.append("INSERT INTO "); quoteIdentifierString(sb, toTable); - sb.append(" ("); - for (int i=0; i < toTableSchema.getCount(); i++) { - if(i != 0) { sb.append(", "); } - quoteIdentifierString(sb, toTableSchema.getColumnName(i)); + for (int i=0; i < schema.getCount(); i++) { + if (i != 0) { sb.append(", "); } + quoteIdentifierString(sb, schema.getColumnName(i)); } - sb.append(") VALUES ("); - for(int i=0; i < toTableSchema.getCount(); i++) { - if(i != 0) { sb.append(", "); } - sb.append("?"); + sb.append(") "); + for (int i=0; i < fromTables.size(); i++) { + if (i != 0) { sb.append(" UNION ALL "); } + sb.append("SELECT "); + for (int j=0; j < schema.getCount(); j++) { + if (j != 0) { sb.append(", "); } + quoteIdentifierString(sb, schema.getColumnName(j)); + } + sb.append(" FROM "); + quoteIdentifierString(sb, fromTables.get(i)); } - sb.append(")"); return sb.toString(); } - protected String buildPrepareUpsertSql(String toTable, JdbcSchema toTableSchema) throws SQLException + protected void collectMerge(List<String> fromTables, JdbcSchema schema, String toTable, List<String> mergeKeys) throws SQLException { - throw new UnsupportedOperationException("not implemented yet"); + Statement stmt = connection.createStatement(); + try { + String sql = buildCollectMergeSql(fromTables, schema, toTable, mergeKeys); + executeUpdate(stmt, sql); + commitIfNecessary(connection); + } catch (SQLException ex) { + throw safeRollback(connection, ex); + } finally { + stmt.close(); + } } - // TODO - //protected void gatherInsertTables(List<String> fromTables, JdbcSchema fromTableSchema, String toTable, - // boolean truncateDestinationFirst) throws SQLException - //{ - // Statement stmt = connection.createStatement(); - // try { - // if(truncateDestinationFirst) { - // String sql = buildTruncateSql(toTable); - // executeUpdate(stmt, sql); - // } - // String sql = buildGatherInsertTables(fromTable, fromTableSchema, toTable); - // executeUpdate(stmt, sql); - // commitIfNecessary(connection); - // } catch (SQLException ex) { - // throw safeRollback(connection, ex); - // } finally { - // stmt.close(); - // } - //} + protected String buildCollectMergeSql(List<String> fromTables, JdbcSchema schema, String toTable, List<String> mergeKeys) throws SQLException + { + throw new UnsupportedOperationException("not implemented"); + } public void replaceTable(String fromTable, JdbcSchema schema, String toTable) throws SQLException { Statement stmt = connection.createStatement(); try {