src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java in embulk-output-jdbc-0.2.4 vs src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java in embulk-output-jdbc-0.3.0
- old
+ new
@@ -1,15 +1,18 @@
package org.embulk.output.jdbc;
+import java.util.List;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-
import org.slf4j.Logger;
+import com.google.common.base.Optional;
import org.embulk.spi.Exec;
public class JdbcOutputConnection
implements AutoCloseable
{
@@ -45,10 +48,15 @@
public DatabaseMetaData getMetaData() throws SQLException
{
return databaseMetaData;
}
+ public Charset getTableNameCharset() throws SQLException
+ {
+ return StandardCharsets.UTF_8;
+ }
+
protected void setSearchPath(String schema) throws SQLException
{
Statement stmt = connection.createStatement();
try {
String sql = "SET search_path TO " + quoteIdentifierString(schema);
@@ -122,50 +130,24 @@
{
StringBuilder sb = new StringBuilder();
sb.append("CREATE TABLE IF NOT EXISTS ");
quoteIdentifierString(sb, name);
- sb.append(buildColumnsOfCreateTableSql(schema));
+ sb.append(buildCreateTableSchemaSql(schema));
return sb.toString();
}
- public void createTable(String tableName, JdbcSchema schema) throws SQLException
+ protected String buildCreateTableSchemaSql(JdbcSchema schema)
{
- Statement stmt = connection.createStatement();
- try {
- String sql = buildCreateTableSql(tableName, schema);
- executeUpdate(stmt, sql);
- commitIfNecessary(connection);
- } catch (SQLException ex) {
- throw safeRollback(connection, ex);
- } finally {
- stmt.close();
- }
- }
-
- protected String buildCreateTableSql(String name, JdbcSchema schema)
- {
StringBuilder sb = new StringBuilder();
- sb.append("CREATE TABLE ");
- quoteIdentifierString(sb, name);
- sb.append(buildColumnsOfCreateTableSql(schema));
- return sb.toString();
- }
-
- private String buildColumnsOfCreateTableSql(JdbcSchema schema)
- {
- StringBuilder sb = new StringBuilder();
-
sb.append(" (");
- boolean first = true;
- for (JdbcColumn c : schema.getColumns()) {
- if (first) { first = false; }
- else { sb.append(", "); }
- quoteIdentifierString(sb, c.getName());
+ for (int i=0; i < schema.getCount(); i++) {
+ if (i != 0) { sb.append(", "); }
+ quoteIdentifierString(sb, schema.getColumnName(i));
sb.append(" ");
- String typeName = getCreateTableTypeName(c);
+ String typeName = getCreateTableTypeName(schema.getColumn(i));
sb.append(typeName);
}
sb.append(")");
return sb.toString();
@@ -179,37 +161,40 @@
SIZE_AND_OPTIONAL_SCALE,
};
protected String getCreateTableTypeName(JdbcColumn c)
{
- String convertedTypeName = convertTypeName(c.getTypeName());
- switch (getColumnDeclareType(convertedTypeName, c)) {
+ if (c.getDeclaredType().isPresent()) {
+ return c.getDeclaredType().get();
+ } else {
+ return buildColumnTypeName(c);
+ }
+ }
+
+ protected String buildColumnTypeName(JdbcColumn c)
+ {
+ String simpleTypeName = c.getSimpleTypeName();
+ switch (getColumnDeclareType(simpleTypeName, c)) {
case SIZE:
- return String.format("%s(%d)", convertedTypeName, c.getSizeTypeParameter());
+ return String.format("%s(%d)", simpleTypeName, c.getSizeTypeParameter());
case SIZE_AND_SCALE:
if (c.getScaleTypeParameter() < 0) {
- return String.format("%s(%d,0)", convertedTypeName, c.getSizeTypeParameter());
+ return String.format("%s(%d,0)", simpleTypeName, c.getSizeTypeParameter());
} else {
- return String.format("%s(%d,%d)", convertedTypeName, c.getSizeTypeParameter(), c.getScaleTypeParameter());
+ return String.format("%s(%d,%d)", simpleTypeName, c.getSizeTypeParameter(), c.getScaleTypeParameter());
}
case SIZE_AND_OPTIONAL_SCALE:
if (c.getScaleTypeParameter() < 0) {
- return String.format("%s(%d)", convertedTypeName, c.getSizeTypeParameter());
+ return String.format("%s(%d)", simpleTypeName, c.getSizeTypeParameter());
} else {
- return String.format("%s(%d,%d)", convertedTypeName, c.getSizeTypeParameter(), c.getScaleTypeParameter());
+ return String.format("%s(%d,%d)", simpleTypeName, c.getSizeTypeParameter(), c.getScaleTypeParameter());
}
default: // SIMPLE
- return convertedTypeName;
+ return simpleTypeName;
}
}
- // hook point for subclasses
- protected String convertTypeName(String typeName)
- {
- return typeName;
- }
-
// TODO
private static final String[] STANDARD_SIZE_TYPE_NAMES = new String[] {
"CHAR",
"VARCHAR", "CHAR VARYING", "CHARACTER VARYING", "LONGVARCHAR",
"NCHAR",
@@ -240,125 +225,121 @@
}
return ColumnDeclareType.SIMPLE;
}
- protected String buildInsertTableSql(String fromTable, JdbcSchema fromTableSchema, String toTable)
+ public PreparedStatement prepareBatchInsertStatement(String toTable, JdbcSchema toTableSchema, Optional<List<String>> mergeKeys) throws SQLException
{
+ String sql;
+ if (mergeKeys.isPresent()) {
+ sql = buildPreparedMergeSql(toTable, toTableSchema, mergeKeys.get());
+ } else {
+ sql = buildPreparedInsertSql(toTable, toTableSchema);
+ }
+ logger.info("Prepared SQL: {}", sql);
+ return connection.prepareStatement(sql);
+ }
+
+ protected String buildPreparedInsertSql(String toTable, JdbcSchema toTableSchema) throws SQLException
+ {
StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO ");
quoteIdentifierString(sb, toTable);
+
sb.append(" (");
- boolean first = true;
- for (JdbcColumn c : fromTableSchema.getColumns()) {
- if (first) { first = false; }
- else { sb.append(", "); }
- quoteIdentifierString(sb, c.getName());
+ for (int i=0; i < toTableSchema.getCount(); i++) {
+ if(i != 0) { sb.append(", "); }
+ quoteIdentifierString(sb, toTableSchema.getColumnName(i));
}
- sb.append(") ");
- sb.append("SELECT ");
- for (JdbcColumn c : fromTableSchema.getColumns()) {
- if (first) { first = false; }
- else { sb.append(", "); }
- quoteIdentifierString(sb, c.getName());
+ sb.append(") VALUES (");
+ for(int i=0; i < toTableSchema.getCount(); i++) {
+ if(i != 0) { sb.append(", "); }
+ sb.append("?");
}
- sb.append(" FROM ");
- quoteIdentifierString(sb, fromTable);
+ sb.append(")");
return sb.toString();
}
- protected String buildTruncateSql(String table)
+ protected String buildPreparedMergeSql(String toTable, JdbcSchema toTableSchema, List<String> mergeKeys) throws SQLException
{
- StringBuilder sb = new StringBuilder();
-
- sb.append("DELETE FROM ");
- quoteIdentifierString(sb, table);
-
- return sb.toString();
+ throw new UnsupportedOperationException("not implemented");
}
- protected void insertTable(String fromTable, JdbcSchema fromTableSchema, String toTable,
+ protected void collectInsert(List<String> fromTables, JdbcSchema schema, String toTable,
boolean truncateDestinationFirst) throws SQLException
{
Statement stmt = connection.createStatement();
try {
- if(truncateDestinationFirst) {
+ if (truncateDestinationFirst) {
String sql = buildTruncateSql(toTable);
executeUpdate(stmt, sql);
}
- String sql = buildInsertTableSql(fromTable, fromTableSchema, toTable);
+ String sql = buildCollectInsertSql(fromTables, schema, toTable);
executeUpdate(stmt, sql);
commitIfNecessary(connection);
} catch (SQLException ex) {
- connection.rollback();
- throw ex;
+ throw safeRollback(connection, ex);
} finally {
stmt.close();
}
}
- public PreparedStatement prepareInsertSql(String toTable, JdbcSchema toTableSchema) throws SQLException
+ protected String buildTruncateSql(String table)
{
- String insertSql = buildPrepareInsertSql(toTable, toTableSchema);
- logger.info("Prepared SQL: {}", insertSql);
- return connection.prepareStatement(insertSql);
- }
+ StringBuilder sb = new StringBuilder();
- public PreparedStatement prepareUpsertSql(String toTable, JdbcSchema toTableSchema) throws SQLException
- {
- String upsertSql = buildPrepareUpsertSql(toTable, toTableSchema);
- logger.info("Prepared SQL: {}", upsertSql);
- return connection.prepareStatement(upsertSql);
+ sb.append("DELETE FROM ");
+ quoteIdentifierString(sb, table);
+
+ return sb.toString();
}
- protected String buildPrepareInsertSql(String toTable, JdbcSchema toTableSchema) throws SQLException
+ protected String buildCollectInsertSql(List<String> fromTables, JdbcSchema schema, String toTable)
{
StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO ");
quoteIdentifierString(sb, toTable);
-
sb.append(" (");
- for (int i=0; i < toTableSchema.getCount(); i++) {
- if(i != 0) { sb.append(", "); }
- quoteIdentifierString(sb, toTableSchema.getColumnName(i));
+ for (int i=0; i < schema.getCount(); i++) {
+ if (i != 0) { sb.append(", "); }
+ quoteIdentifierString(sb, schema.getColumnName(i));
}
- sb.append(") VALUES (");
- for(int i=0; i < toTableSchema.getCount(); i++) {
- if(i != 0) { sb.append(", "); }
- sb.append("?");
+ sb.append(") ");
+ for (int i=0; i < fromTables.size(); i++) {
+ if (i != 0) { sb.append(" UNION ALL "); }
+ sb.append("SELECT ");
+ for (int j=0; j < schema.getCount(); j++) {
+ if (j != 0) { sb.append(", "); }
+ quoteIdentifierString(sb, schema.getColumnName(j));
+ }
+ sb.append(" FROM ");
+ quoteIdentifierString(sb, fromTables.get(i));
}
- sb.append(")");
return sb.toString();
}
- protected String buildPrepareUpsertSql(String toTable, JdbcSchema toTableSchema) throws SQLException
+ protected void collectMerge(List<String> fromTables, JdbcSchema schema, String toTable, List<String> mergeKeys) throws SQLException
{
- throw new UnsupportedOperationException("not implemented yet");
+ Statement stmt = connection.createStatement();
+ try {
+ String sql = buildCollectMergeSql(fromTables, schema, toTable, mergeKeys);
+ executeUpdate(stmt, sql);
+ commitIfNecessary(connection);
+ } catch (SQLException ex) {
+ throw safeRollback(connection, ex);
+ } finally {
+ stmt.close();
+ }
}
- // TODO
- //protected void gatherInsertTables(List<String> fromTables, JdbcSchema fromTableSchema, String toTable,
- // boolean truncateDestinationFirst) throws SQLException
- //{
- // Statement stmt = connection.createStatement();
- // try {
- // if(truncateDestinationFirst) {
- // String sql = buildTruncateSql(toTable);
- // executeUpdate(stmt, sql);
- // }
- // String sql = buildGatherInsertTables(fromTable, fromTableSchema, toTable);
- // executeUpdate(stmt, sql);
- // commitIfNecessary(connection);
- // } catch (SQLException ex) {
- // throw safeRollback(connection, ex);
- // } finally {
- // stmt.close();
- // }
- //}
+ protected String buildCollectMergeSql(List<String> fromTables, JdbcSchema schema, String toTable, List<String> mergeKeys) throws SQLException
+ {
+ throw new UnsupportedOperationException("not implemented");
+ }
public void replaceTable(String fromTable, JdbcSchema schema, String toTable) throws SQLException
{
Statement stmt = connection.createStatement();
try {