src/main/java/org/embulk/input/mysql/MySQLInputConnection.java in embulk-input-mysql-0.7.2 vs src/main/java/org/embulk/input/mysql/MySQLInputConnection.java in embulk-input-mysql-0.7.3

- old
+ new

@@ -1,12 +1,15 @@ package org.embulk.input.mysql; +import java.util.List; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.ResultSet; import org.embulk.input.jdbc.JdbcInputConnection; +import org.embulk.input.jdbc.JdbcLiteral; +import org.embulk.input.jdbc.getter.ColumnGetter; public class MySQLInputConnection extends JdbcInputConnection { public MySQLInputConnection(Connection connection) @@ -14,13 +17,22 @@ { super(connection, null); } @Override - protected BatchSelect newBatchSelect(String select, int fetchRows, int queryTimeout) throws SQLException + protected BatchSelect newBatchSelect(PreparedQuery preparedQuery, + List<ColumnGetter> getters, + int fetchRows, int queryTimeout) throws SQLException { - logger.info("SQL: " + select); - PreparedStatement stmt = connection.prepareStatement(select, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); // TYPE_FORWARD_ONLY and CONCUR_READ_ONLY are default + String query = preparedQuery.getQuery(); + List<JdbcLiteral> params = preparedQuery.getParameters(); + + logger.info("SQL: " + query); + PreparedStatement stmt = connection.prepareStatement(query, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); // TYPE_FORWARD_ONLY and CONCUR_READ_ONLY are default + if (!params.isEmpty()) { + logger.info("Parameters: {}", params); + prepareParameters(stmt, getters, params); + } if (fetchRows == 1) { // See MySQLInputPlugin.newConnection doesn't set useCursorFetch=true when fetchRows=1 // MySQL Connector/J keeps the connection opened and process rows one by one with Integer.MIN_VALUE. stmt.setFetchSize(Integer.MIN_VALUE); } else if (fetchRows <= 0) {