src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java in embulk-output-jdbc-0.6.1 vs src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java in embulk-output-jdbc-0.6.2

- old
+ new

@@ -7,10 +7,11 @@ import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; + import org.slf4j.Logger; import com.google.common.base.Optional; import org.embulk.spi.Exec; public class JdbcOutputConnection @@ -238,15 +239,15 @@ } return ColumnDeclareType.SIMPLE; } - public PreparedStatement prepareBatchInsertStatement(String toTable, JdbcSchema toTableSchema, Optional<List<String>> mergeKeys) throws SQLException + public PreparedStatement prepareBatchInsertStatement(String toTable, JdbcSchema toTableSchema, Optional<MergeConfig> mergeConfig) throws SQLException { String sql; - if (mergeKeys.isPresent()) { - sql = buildPreparedMergeSql(toTable, toTableSchema, mergeKeys.get()); + if (mergeConfig.isPresent()) { + sql = buildPreparedMergeSql(toTable, toTableSchema, mergeConfig.get()); } else { sql = buildPreparedInsertSql(toTable, toTableSchema); } logger.info("Prepared SQL: {}", sql); return connection.prepareStatement(sql); @@ -272,11 +273,11 @@ sb.append(")"); return sb.toString(); } - protected String buildPreparedMergeSql(String toTable, JdbcSchema toTableSchema, List<String> mergeKeys) throws SQLException + protected String buildPreparedMergeSql(String toTable, JdbcSchema toTableSchema, MergeConfig mergeConfig) throws SQLException { throw new UnsupportedOperationException("not implemented"); } protected void collectInsert(List<String> fromTables, JdbcSchema schema, String toTable, @@ -332,24 +333,24 @@ } return sb.toString(); } - protected void collectMerge(List<String> fromTables, JdbcSchema schema, String toTable, List<String> mergeKeys) throws SQLException + protected void collectMerge(List<String> fromTables, JdbcSchema schema, String toTable, MergeConfig mergeConfig) throws SQLException { Statement stmt = connection.createStatement(); try { - String sql = buildCollectMergeSql(fromTables, schema, toTable, mergeKeys); + String sql = buildCollectMergeSql(fromTables, schema, toTable, mergeConfig); executeUpdate(stmt, sql); commitIfNecessary(connection); } catch (SQLException ex) { throw safeRollback(connection, ex); } finally { stmt.close(); } } - protected String buildCollectMergeSql(List<String> fromTables, JdbcSchema schema, String toTable, List<String> mergeKeys) throws SQLException + protected String buildCollectMergeSql(List<String> fromTables, JdbcSchema schema, String toTable, MergeConfig mergeConfig) throws SQLException { throw new UnsupportedOperationException("not implemented"); } public void replaceTable(String fromTable, JdbcSchema schema, String toTable) throws SQLException