package org.embulk.input.jdbc; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.Statement; import java.sql.SQLException; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import org.slf4j.Logger; import org.embulk.spi.Exec; public class JdbcInputConnection implements AutoCloseable { protected final Logger logger = Exec.getLogger(getClass()); protected final Connection connection; protected final String schemaName; protected final DatabaseMetaData databaseMetaData; protected String identifierQuoteString; public JdbcInputConnection(Connection connection, String schemaName) throws SQLException { this.connection = connection; this.schemaName = schemaName; this.databaseMetaData = connection.getMetaData(); this.identifierQuoteString = databaseMetaData.getIdentifierQuoteString(); if (schemaName != null) { setSearchPath(schemaName); } connection.setAutoCommit(false); } protected void setSearchPath(String schema) throws SQLException { String sql = "SET search_path TO " + quoteIdentifierString(schema); executeUpdate(sql); } public JdbcSchema getSchemaOfQuery(String query) throws SQLException { PreparedStatement stmt = connection.prepareStatement(query); try { return getSchemaOfResultMetadata(stmt.getMetaData()); } finally { stmt.close(); } } protected JdbcSchema getSchemaOfResultMetadata(ResultSetMetaData metadata) throws SQLException { ImmutableList.Builder columns = ImmutableList.builder(); for (int i=0; i < metadata.getColumnCount(); i++) { int index = i + 1; // JDBC column index begins from 1 String name = metadata.getColumnLabel(index); String typeName = metadata.getColumnTypeName(index); int sqlType = metadata.getColumnType(index); int scale = metadata.getScale(index); int precision = metadata.getPrecision(index); columns.add(new JdbcColumn(name, typeName, sqlType, precision, scale)); } return new JdbcSchema(columns.build()); } public BatchSelect newSelectCursor(String query, int fetchRows, int queryTimeout) throws SQLException { return newBatchSelect(query, fetchRows, queryTimeout); } protected BatchSelect newBatchSelect(String query, int fetchRows, int queryTimeout) throws SQLException { logger.info("SQL: " + query); PreparedStatement stmt = connection.prepareStatement(query); stmt.setFetchSize(fetchRows); stmt.setQueryTimeout(queryTimeout); return new SingleSelect(stmt); } public interface BatchSelect extends AutoCloseable { public ResultSet fetch() throws SQLException; @Override public void close() throws SQLException; } public class SingleSelect implements BatchSelect { private final PreparedStatement fetchStatement; private boolean fetched = false; public SingleSelect(PreparedStatement fetchStatement) throws SQLException { this.fetchStatement = fetchStatement; } public ResultSet fetch() throws SQLException { if (fetched == true) { return null; } long startTime = System.currentTimeMillis(); ResultSet rs = fetchStatement.executeQuery(); double seconds = (System.currentTimeMillis() - startTime) / 1000.0; logger.info(String.format("> %.2f seconds", seconds)); fetched = true; return rs; } public void close() throws SQLException { // TODO close? } } @Override public void close() throws SQLException { connection.close(); } protected void executeUpdate(String sql) throws SQLException { logger.info("SQL: " + sql); Statement stmt = connection.createStatement(); try { stmt.executeUpdate(sql); } finally { stmt.close(); } } // TODO share code with embulk-output-jdbc protected String quoteIdentifierString(String str) { return identifierQuoteString + str + identifierQuoteString; } public String buildSelectQuery(String tableName, Optional selectColumnList, Optional whereCondition, Optional orderByColumn) { StringBuilder sb = new StringBuilder(); sb.append("SELECT "); sb.append(selectColumnList.or("*")); sb.append(" FROM ").append(quoteIdentifierString(tableName)); if (whereCondition.isPresent()) { sb.append(" WHERE ").append(whereCondition.get()); } if (orderByColumn.isPresent()) { sb.append("ORDER BY ").append(quoteIdentifierString(orderByColumn.get())).append(" ASC"); } return sb.toString(); } }