src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java in embulk-output-jdbc-0.4.1 vs src/main/java/org/embulk/output/jdbc/JdbcOutputConnection.java in embulk-output-jdbc-0.4.2

- old
+ new

@@ -1,492 +1,492 @@ -package org.embulk.output.jdbc; - -import java.util.List; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import org.slf4j.Logger; -import com.google.common.base.Optional; -import org.embulk.spi.Exec; - -public class JdbcOutputConnection - implements AutoCloseable -{ - private final Logger logger = Exec.getLogger(JdbcOutputConnection.class); - protected final Connection connection; - protected final String schemaName; - protected final DatabaseMetaData databaseMetaData; - protected String identifierQuoteString; - - public JdbcOutputConnection(Connection connection, String schemaName) - throws SQLException - { - this.connection = connection; - this.schemaName = schemaName; - this.databaseMetaData = connection.getMetaData(); - this.identifierQuoteString = databaseMetaData.getIdentifierQuoteString(); - if (schemaName != null) { - setSearchPath(schemaName); - } - } - - @Override - public void close() throws SQLException - { - connection.close(); - } - - public String getSchemaName() - { - return schemaName; - } - - public DatabaseMetaData getMetaData() throws SQLException - { - return databaseMetaData; - } - - public Charset getTableNameCharset() throws SQLException - { - return StandardCharsets.UTF_8; - } - - protected void setSearchPath(String schema) throws SQLException - { - Statement stmt = connection.createStatement(); - try { - String sql = "SET search_path TO " + quoteIdentifierString(schema); - executeUpdate(stmt, sql); - commitIfNecessary(connection); - } finally { - stmt.close(); - } - } - - public boolean tableExists(String tableName) throws SQLException - { - try (ResultSet rs = connection.getMetaData().getTables(null, schemaName, tableName, null)) { - return rs.next(); - } - } - - public void dropTableIfExists(String tableName) throws SQLException - { - Statement stmt = connection.createStatement(); - try { - dropTableIfExists(stmt, tableName); - commitIfNecessary(connection); - } catch (SQLException ex) { - throw safeRollback(connection, ex); - } finally { - stmt.close(); - } - } - - protected void dropTableIfExists(Statement stmt, String tableName) throws SQLException - { - String sql = String.format("DROP TABLE IF EXISTS %s", quoteIdentifierString(tableName)); - executeUpdate(stmt, sql); - } - - public void dropTable(String tableName) throws SQLException - { - Statement stmt = connection.createStatement(); - try { - dropTable(stmt, tableName); - commitIfNecessary(connection); - } catch (SQLException ex) { - throw safeRollback(connection, ex); - } finally { - stmt.close(); - } - } - - protected void dropTable(Statement stmt, String tableName) throws SQLException - { - String sql = String.format("DROP TABLE %s", quoteIdentifierString(tableName)); - executeUpdate(stmt, sql); - } - - public void createTableIfNotExists(String tableName, JdbcSchema schema) throws SQLException - { - Statement stmt = connection.createStatement(); - try { - String sql = buildCreateTableIfNotExistsSql(tableName, schema); - executeUpdate(stmt, sql); - commitIfNecessary(connection); - } catch (SQLException ex) { - throw safeRollback(connection, ex); - } finally { - stmt.close(); - } - } - - protected String buildCreateTableIfNotExistsSql(String name, JdbcSchema schema) - { - StringBuilder sb = new StringBuilder(); - - sb.append("CREATE TABLE IF NOT EXISTS "); - quoteIdentifierString(sb, name); - sb.append(buildCreateTableSchemaSql(schema)); - return sb.toString(); - } - - protected String buildCreateTableSchemaSql(JdbcSchema schema) - { - StringBuilder sb = new StringBuilder(); - - sb.append(" ("); - for (int i=0; i < schema.getCount(); i++) { - if (i != 0) { sb.append(", "); } - quoteIdentifierString(sb, schema.getColumnName(i)); - sb.append(" "); - String typeName = getCreateTableTypeName(schema.getColumn(i)); - sb.append(typeName); - } - sb.append(")"); - - return sb.toString(); - } - - public static enum ColumnDeclareType - { - SIMPLE, - SIZE, - SIZE_AND_SCALE, - SIZE_AND_OPTIONAL_SCALE, - }; - - protected String getCreateTableTypeName(JdbcColumn c) - { - if (c.getDeclaredType().isPresent()) { - return c.getDeclaredType().get(); - } else { - return buildColumnTypeName(c); - } - } - - protected String buildColumnTypeName(JdbcColumn c) - { - String simpleTypeName = c.getSimpleTypeName(); - switch (getColumnDeclareType(simpleTypeName, c)) { - case SIZE: - return String.format("%s(%d)", simpleTypeName, c.getSizeTypeParameter()); - case SIZE_AND_SCALE: - if (c.getScaleTypeParameter() < 0) { - return String.format("%s(%d,0)", simpleTypeName, c.getSizeTypeParameter()); - } else { - return String.format("%s(%d,%d)", simpleTypeName, c.getSizeTypeParameter(), c.getScaleTypeParameter()); - } - case SIZE_AND_OPTIONAL_SCALE: - if (c.getScaleTypeParameter() < 0) { - return String.format("%s(%d)", simpleTypeName, c.getSizeTypeParameter()); - } else { - return String.format("%s(%d,%d)", simpleTypeName, c.getSizeTypeParameter(), c.getScaleTypeParameter()); - } - default: // SIMPLE - return simpleTypeName; - } - } - - // TODO - private static final String[] STANDARD_SIZE_TYPE_NAMES = new String[] { - "CHAR", - "VARCHAR", "CHAR VARYING", "CHARACTER VARYING", "LONGVARCHAR", - "NCHAR", - "NVARCHAR", "NCHAR VARYING", "NATIONAL CHAR VARYING", "NATIONAL CHARACTER VARYING", - "BINARY", - "VARBINARY", "BINARY VARYING", "LONGVARBINARY", - "BIT", - "VARBIT", "BIT VARYING", - "FLOAT", // SQL standard's FLOAT[(p)] optionally accepts precision - }; - - private static final String[] STANDARD_SIZE_AND_SCALE_TYPE_NAMES = new String[] { - "DECIMAL", - }; - - protected ColumnDeclareType getColumnDeclareType(String convertedTypeName, JdbcColumn col) - { - for (String x : STANDARD_SIZE_TYPE_NAMES) { - if (x.equals(convertedTypeName)) { - return ColumnDeclareType.SIZE; - } - } - - for (String x : STANDARD_SIZE_AND_SCALE_TYPE_NAMES) { - if (x.equals(convertedTypeName)) { - return ColumnDeclareType.SIZE_AND_SCALE; - } - } - - return ColumnDeclareType.SIMPLE; - } - - public PreparedStatement prepareBatchInsertStatement(String toTable, JdbcSchema toTableSchema, Optional<List<String>> mergeKeys) throws SQLException - { - String sql; - if (mergeKeys.isPresent()) { - sql = buildPreparedMergeSql(toTable, toTableSchema, mergeKeys.get()); - } else { - sql = buildPreparedInsertSql(toTable, toTableSchema); - } - logger.info("Prepared SQL: {}", sql); - return connection.prepareStatement(sql); - } - - protected String buildPreparedInsertSql(String toTable, JdbcSchema toTableSchema) throws SQLException - { - StringBuilder sb = new StringBuilder(); - - sb.append("INSERT INTO "); - quoteIdentifierString(sb, toTable); - - sb.append(" ("); - for (int i=0; i < toTableSchema.getCount(); i++) { - if(i != 0) { sb.append(", "); } - quoteIdentifierString(sb, toTableSchema.getColumnName(i)); - } - sb.append(") VALUES ("); - for(int i=0; i < toTableSchema.getCount(); i++) { - if(i != 0) { sb.append(", "); } - sb.append("?"); - } - sb.append(")"); - - return sb.toString(); - } - - protected String buildPreparedMergeSql(String toTable, JdbcSchema toTableSchema, List<String> mergeKeys) throws SQLException - { - throw new UnsupportedOperationException("not implemented"); - } - - protected void collectInsert(List<String> fromTables, JdbcSchema schema, String toTable, - boolean truncateDestinationFirst) throws SQLException - { - Statement stmt = connection.createStatement(); - try { - if (truncateDestinationFirst) { - String sql = buildTruncateSql(toTable); - executeUpdate(stmt, sql); - } - String sql = buildCollectInsertSql(fromTables, schema, toTable); - executeUpdate(stmt, sql); - commitIfNecessary(connection); - } catch (SQLException ex) { - throw safeRollback(connection, ex); - } finally { - stmt.close(); - } - } - - protected String buildTruncateSql(String table) - { - StringBuilder sb = new StringBuilder(); - - sb.append("DELETE FROM "); - quoteIdentifierString(sb, table); - - return sb.toString(); - } - - protected String buildCollectInsertSql(List<String> fromTables, JdbcSchema schema, String toTable) - { - StringBuilder sb = new StringBuilder(); - - sb.append("INSERT INTO "); - quoteIdentifierString(sb, toTable); - sb.append(" ("); - for (int i=0; i < schema.getCount(); i++) { - if (i != 0) { sb.append(", "); } - quoteIdentifierString(sb, schema.getColumnName(i)); - } - sb.append(") "); - for (int i=0; i < fromTables.size(); i++) { - if (i != 0) { sb.append(" UNION ALL "); } - sb.append("SELECT "); - for (int j=0; j < schema.getCount(); j++) { - if (j != 0) { sb.append(", "); } - quoteIdentifierString(sb, schema.getColumnName(j)); - } - sb.append(" FROM "); - quoteIdentifierString(sb, fromTables.get(i)); - } - - return sb.toString(); - } - - protected void collectMerge(List<String> fromTables, JdbcSchema schema, String toTable, List<String> mergeKeys) throws SQLException - { - Statement stmt = connection.createStatement(); - try { - String sql = buildCollectMergeSql(fromTables, schema, toTable, mergeKeys); - executeUpdate(stmt, sql); - commitIfNecessary(connection); - } catch (SQLException ex) { - throw safeRollback(connection, ex); - } finally { - stmt.close(); - } - } - - protected String buildCollectMergeSql(List<String> fromTables, JdbcSchema schema, String toTable, List<String> mergeKeys) throws SQLException - { - throw new UnsupportedOperationException("not implemented"); - } - - public void replaceTable(String fromTable, JdbcSchema schema, String toTable) throws SQLException - { - Statement stmt = connection.createStatement(); - try { - dropTableIfExists(stmt, toTable); - - StringBuilder sb = new StringBuilder(); - sb.append("ALTER TABLE "); - quoteIdentifierString(sb, fromTable); - sb.append(" RENAME TO "); - quoteIdentifierString(sb, toTable); - String sql = sb.toString(); - executeUpdate(stmt, sql); - - commitIfNecessary(connection); - } catch (SQLException ex) { - throw safeRollback(connection, ex); - } finally { - stmt.close(); - } - } - - protected void quoteIdentifierString(StringBuilder sb, String str) - { - sb.append(quoteIdentifierString(str, identifierQuoteString)); - } - - protected String quoteIdentifierString(String str) - { - return quoteIdentifierString(str, identifierQuoteString); - } - - protected String quoteIdentifierString(String str, String quoteString) - { - // TODO if identifierQuoteString.equals(" ") && str.contains([^a-zA-Z0-9_connection.getMetaData().getExtraNameCharacters()]) - // TODO if str.contains(identifierQuoteString); - return quoteString + str + quoteString; - } - - // PostgreSQL JDBC driver implements isValid() method. But the - // implementation throws following exception: - // "java.io.IOException: Method org.postgresql.jdbc4.Jdbc4Connection.isValid(int) is not yet implemented." - // - // So, checking mechanism doesn't work at all. - // Thus here just runs "SELECT 1" to check connectivity. - // - public boolean isValidConnection(int timeout) throws SQLException - { - Statement stmt = connection.createStatement(); - try { - stmt.executeQuery("SELECT 1").close(); - return true; - } catch (SQLException ex) { - return false; - } finally { - stmt.close(); - } - } - - protected String[] getDeterministicSqlStates() - { - return new String[0]; - } - - protected int[] getDeterministicErrorCodes() - { - return new int[0]; - } - - protected Class[] getDeterministicRootCauses() - { - return new Class[] { - // Don't retry on UnknownHostException. - java.net.UnknownHostException.class, - - //// we should not retry on connect() error? - //java.net.ConnectException.class, - }; - } - - public boolean isRetryableException(SQLException exception) - { - String sqlState = exception.getSQLState(); - for (String deterministic : getDeterministicSqlStates()) { - if (sqlState.equals(deterministic)) { - return false; - } - } - - int errorCode = exception.getErrorCode(); - for (int deterministic : getDeterministicErrorCodes()) { - if (errorCode == deterministic) { - return false; - } - } - - Throwable rootCause = getRootCause(exception); - for (Class deterministic : getDeterministicRootCauses()) { - if (deterministic.equals(rootCause.getClass())) { - return false; - } - } - - return true; - } - - private Throwable getRootCause(Throwable e) { - while (e.getCause() != null) { - e = e.getCause(); - } - return e; - } - - protected int executeUpdate(Statement stmt, String sql) throws SQLException - { - logger.info("SQL: " + sql); - long startTime = System.currentTimeMillis(); - int count = stmt.executeUpdate(sql); - double seconds = (System.currentTimeMillis() - startTime) / 1000.0; - if (count == 0) { - logger.info(String.format("> %.2f seconds", seconds)); - } else { - logger.info(String.format("> %.2f seconds (%,d rows)", seconds, count)); - } - return count; - } - - protected void commitIfNecessary(Connection con) throws SQLException - { - if (!con.getAutoCommit()) { - con.commit(); - } - } - - protected SQLException safeRollback(Connection con, SQLException cause) - { - try { - if (!con.getAutoCommit()) { - con.rollback(); - } - return cause; - } catch (SQLException ex) { - if (cause != null) { - cause.addSuppressed(ex); - return cause; - } - return ex; - } - } -} +package org.embulk.output.jdbc; + +import java.util.List; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import org.slf4j.Logger; +import com.google.common.base.Optional; +import org.embulk.spi.Exec; + +public class JdbcOutputConnection + implements AutoCloseable +{ + private final Logger logger = Exec.getLogger(JdbcOutputConnection.class); + protected final Connection connection; + protected final String schemaName; + protected final DatabaseMetaData databaseMetaData; + protected String identifierQuoteString; + + public JdbcOutputConnection(Connection connection, String schemaName) + throws SQLException + { + this.connection = connection; + this.schemaName = schemaName; + this.databaseMetaData = connection.getMetaData(); + this.identifierQuoteString = databaseMetaData.getIdentifierQuoteString(); + if (schemaName != null) { + setSearchPath(schemaName); + } + } + + @Override + public void close() throws SQLException + { + connection.close(); + } + + public String getSchemaName() + { + return schemaName; + } + + public DatabaseMetaData getMetaData() throws SQLException + { + return databaseMetaData; + } + + public Charset getTableNameCharset() throws SQLException + { + return StandardCharsets.UTF_8; + } + + protected void setSearchPath(String schema) throws SQLException + { + Statement stmt = connection.createStatement(); + try { + String sql = "SET search_path TO " + quoteIdentifierString(schema); + executeUpdate(stmt, sql); + commitIfNecessary(connection); + } finally { + stmt.close(); + } + } + + public boolean tableExists(String tableName) throws SQLException + { + try (ResultSet rs = connection.getMetaData().getTables(null, schemaName, tableName, null)) { + return rs.next(); + } + } + + public void dropTableIfExists(String tableName) throws SQLException + { + Statement stmt = connection.createStatement(); + try { + dropTableIfExists(stmt, tableName); + commitIfNecessary(connection); + } catch (SQLException ex) { + throw safeRollback(connection, ex); + } finally { + stmt.close(); + } + } + + protected void dropTableIfExists(Statement stmt, String tableName) throws SQLException + { + String sql = String.format("DROP TABLE IF EXISTS %s", quoteIdentifierString(tableName)); + executeUpdate(stmt, sql); + } + + public void dropTable(String tableName) throws SQLException + { + Statement stmt = connection.createStatement(); + try { + dropTable(stmt, tableName); + commitIfNecessary(connection); + } catch (SQLException ex) { + throw safeRollback(connection, ex); + } finally { + stmt.close(); + } + } + + protected void dropTable(Statement stmt, String tableName) throws SQLException + { + String sql = String.format("DROP TABLE %s", quoteIdentifierString(tableName)); + executeUpdate(stmt, sql); + } + + public void createTableIfNotExists(String tableName, JdbcSchema schema) throws SQLException + { + Statement stmt = connection.createStatement(); + try { + String sql = buildCreateTableIfNotExistsSql(tableName, schema); + executeUpdate(stmt, sql); + commitIfNecessary(connection); + } catch (SQLException ex) { + throw safeRollback(connection, ex); + } finally { + stmt.close(); + } + } + + protected String buildCreateTableIfNotExistsSql(String name, JdbcSchema schema) + { + StringBuilder sb = new StringBuilder(); + + sb.append("CREATE TABLE IF NOT EXISTS "); + quoteIdentifierString(sb, name); + sb.append(buildCreateTableSchemaSql(schema)); + return sb.toString(); + } + + protected String buildCreateTableSchemaSql(JdbcSchema schema) + { + StringBuilder sb = new StringBuilder(); + + sb.append(" ("); + for (int i=0; i < schema.getCount(); i++) { + if (i != 0) { sb.append(", "); } + quoteIdentifierString(sb, schema.getColumnName(i)); + sb.append(" "); + String typeName = getCreateTableTypeName(schema.getColumn(i)); + sb.append(typeName); + } + sb.append(")"); + + return sb.toString(); + } + + public static enum ColumnDeclareType + { + SIMPLE, + SIZE, + SIZE_AND_SCALE, + SIZE_AND_OPTIONAL_SCALE, + }; + + protected String getCreateTableTypeName(JdbcColumn c) + { + if (c.getDeclaredType().isPresent()) { + return c.getDeclaredType().get(); + } else { + return buildColumnTypeName(c); + } + } + + protected String buildColumnTypeName(JdbcColumn c) + { + String simpleTypeName = c.getSimpleTypeName(); + switch (getColumnDeclareType(simpleTypeName, c)) { + case SIZE: + return String.format("%s(%d)", simpleTypeName, c.getSizeTypeParameter()); + case SIZE_AND_SCALE: + if (c.getScaleTypeParameter() < 0) { + return String.format("%s(%d,0)", simpleTypeName, c.getSizeTypeParameter()); + } else { + return String.format("%s(%d,%d)", simpleTypeName, c.getSizeTypeParameter(), c.getScaleTypeParameter()); + } + case SIZE_AND_OPTIONAL_SCALE: + if (c.getScaleTypeParameter() < 0) { + return String.format("%s(%d)", simpleTypeName, c.getSizeTypeParameter()); + } else { + return String.format("%s(%d,%d)", simpleTypeName, c.getSizeTypeParameter(), c.getScaleTypeParameter()); + } + default: // SIMPLE + return simpleTypeName; + } + } + + // TODO + private static final String[] STANDARD_SIZE_TYPE_NAMES = new String[] { + "CHAR", + "VARCHAR", "CHAR VARYING", "CHARACTER VARYING", "LONGVARCHAR", + "NCHAR", + "NVARCHAR", "NCHAR VARYING", "NATIONAL CHAR VARYING", "NATIONAL CHARACTER VARYING", + "BINARY", + "VARBINARY", "BINARY VARYING", "LONGVARBINARY", + "BIT", + "VARBIT", "BIT VARYING", + "FLOAT", // SQL standard's FLOAT[(p)] optionally accepts precision + }; + + private static final String[] STANDARD_SIZE_AND_SCALE_TYPE_NAMES = new String[] { + "DECIMAL", + }; + + protected ColumnDeclareType getColumnDeclareType(String convertedTypeName, JdbcColumn col) + { + for (String x : STANDARD_SIZE_TYPE_NAMES) { + if (x.equals(convertedTypeName)) { + return ColumnDeclareType.SIZE; + } + } + + for (String x : STANDARD_SIZE_AND_SCALE_TYPE_NAMES) { + if (x.equals(convertedTypeName)) { + return ColumnDeclareType.SIZE_AND_SCALE; + } + } + + return ColumnDeclareType.SIMPLE; + } + + public PreparedStatement prepareBatchInsertStatement(String toTable, JdbcSchema toTableSchema, Optional<List<String>> mergeKeys) throws SQLException + { + String sql; + if (mergeKeys.isPresent()) { + sql = buildPreparedMergeSql(toTable, toTableSchema, mergeKeys.get()); + } else { + sql = buildPreparedInsertSql(toTable, toTableSchema); + } + logger.info("Prepared SQL: {}", sql); + return connection.prepareStatement(sql); + } + + protected String buildPreparedInsertSql(String toTable, JdbcSchema toTableSchema) throws SQLException + { + StringBuilder sb = new StringBuilder(); + + sb.append("INSERT INTO "); + quoteIdentifierString(sb, toTable); + + sb.append(" ("); + for (int i=0; i < toTableSchema.getCount(); i++) { + if(i != 0) { sb.append(", "); } + quoteIdentifierString(sb, toTableSchema.getColumnName(i)); + } + sb.append(") VALUES ("); + for(int i=0; i < toTableSchema.getCount(); i++) { + if(i != 0) { sb.append(", "); } + sb.append("?"); + } + sb.append(")"); + + return sb.toString(); + } + + protected String buildPreparedMergeSql(String toTable, JdbcSchema toTableSchema, List<String> mergeKeys) throws SQLException + { + throw new UnsupportedOperationException("not implemented"); + } + + protected void collectInsert(List<String> fromTables, JdbcSchema schema, String toTable, + boolean truncateDestinationFirst) throws SQLException + { + Statement stmt = connection.createStatement(); + try { + if (truncateDestinationFirst) { + String sql = buildTruncateSql(toTable); + executeUpdate(stmt, sql); + } + String sql = buildCollectInsertSql(fromTables, schema, toTable); + executeUpdate(stmt, sql); + commitIfNecessary(connection); + } catch (SQLException ex) { + throw safeRollback(connection, ex); + } finally { + stmt.close(); + } + } + + protected String buildTruncateSql(String table) + { + StringBuilder sb = new StringBuilder(); + + sb.append("DELETE FROM "); + quoteIdentifierString(sb, table); + + return sb.toString(); + } + + protected String buildCollectInsertSql(List<String> fromTables, JdbcSchema schema, String toTable) + { + StringBuilder sb = new StringBuilder(); + + sb.append("INSERT INTO "); + quoteIdentifierString(sb, toTable); + sb.append(" ("); + for (int i=0; i < schema.getCount(); i++) { + if (i != 0) { sb.append(", "); } + quoteIdentifierString(sb, schema.getColumnName(i)); + } + sb.append(") "); + for (int i=0; i < fromTables.size(); i++) { + if (i != 0) { sb.append(" UNION ALL "); } + sb.append("SELECT "); + for (int j=0; j < schema.getCount(); j++) { + if (j != 0) { sb.append(", "); } + quoteIdentifierString(sb, schema.getColumnName(j)); + } + sb.append(" FROM "); + quoteIdentifierString(sb, fromTables.get(i)); + } + + return sb.toString(); + } + + protected void collectMerge(List<String> fromTables, JdbcSchema schema, String toTable, List<String> mergeKeys) throws SQLException + { + Statement stmt = connection.createStatement(); + try { + String sql = buildCollectMergeSql(fromTables, schema, toTable, mergeKeys); + executeUpdate(stmt, sql); + commitIfNecessary(connection); + } catch (SQLException ex) { + throw safeRollback(connection, ex); + } finally { + stmt.close(); + } + } + + protected String buildCollectMergeSql(List<String> fromTables, JdbcSchema schema, String toTable, List<String> mergeKeys) throws SQLException + { + throw new UnsupportedOperationException("not implemented"); + } + + public void replaceTable(String fromTable, JdbcSchema schema, String toTable) throws SQLException + { + Statement stmt = connection.createStatement(); + try { + dropTableIfExists(stmt, toTable); + + StringBuilder sb = new StringBuilder(); + sb.append("ALTER TABLE "); + quoteIdentifierString(sb, fromTable); + sb.append(" RENAME TO "); + quoteIdentifierString(sb, toTable); + String sql = sb.toString(); + executeUpdate(stmt, sql); + + commitIfNecessary(connection); + } catch (SQLException ex) { + throw safeRollback(connection, ex); + } finally { + stmt.close(); + } + } + + protected void quoteIdentifierString(StringBuilder sb, String str) + { + sb.append(quoteIdentifierString(str, identifierQuoteString)); + } + + protected String quoteIdentifierString(String str) + { + return quoteIdentifierString(str, identifierQuoteString); + } + + protected String quoteIdentifierString(String str, String quoteString) + { + // TODO if identifierQuoteString.equals(" ") && str.contains([^a-zA-Z0-9_connection.getMetaData().getExtraNameCharacters()]) + // TODO if str.contains(identifierQuoteString); + return quoteString + str + quoteString; + } + + // PostgreSQL JDBC driver implements isValid() method. But the + // implementation throws following exception: + // "java.io.IOException: Method org.postgresql.jdbc4.Jdbc4Connection.isValid(int) is not yet implemented." + // + // So, checking mechanism doesn't work at all. + // Thus here just runs "SELECT 1" to check connectivity. + // + public boolean isValidConnection(int timeout) throws SQLException + { + Statement stmt = connection.createStatement(); + try { + stmt.executeQuery("SELECT 1").close(); + return true; + } catch (SQLException ex) { + return false; + } finally { + stmt.close(); + } + } + + protected String[] getDeterministicSqlStates() + { + return new String[0]; + } + + protected int[] getDeterministicErrorCodes() + { + return new int[0]; + } + + protected Class[] getDeterministicRootCauses() + { + return new Class[] { + // Don't retry on UnknownHostException. + java.net.UnknownHostException.class, + + //// we should not retry on connect() error? + //java.net.ConnectException.class, + }; + } + + public boolean isRetryableException(SQLException exception) + { + String sqlState = exception.getSQLState(); + for (String deterministic : getDeterministicSqlStates()) { + if (sqlState.equals(deterministic)) { + return false; + } + } + + int errorCode = exception.getErrorCode(); + for (int deterministic : getDeterministicErrorCodes()) { + if (errorCode == deterministic) { + return false; + } + } + + Throwable rootCause = getRootCause(exception); + for (Class deterministic : getDeterministicRootCauses()) { + if (deterministic.equals(rootCause.getClass())) { + return false; + } + } + + return true; + } + + private Throwable getRootCause(Throwable e) { + while (e.getCause() != null) { + e = e.getCause(); + } + return e; + } + + protected int executeUpdate(Statement stmt, String sql) throws SQLException + { + logger.info("SQL: " + sql); + long startTime = System.currentTimeMillis(); + int count = stmt.executeUpdate(sql); + double seconds = (System.currentTimeMillis() - startTime) / 1000.0; + if (count == 0) { + logger.info(String.format("> %.2f seconds", seconds)); + } else { + logger.info(String.format("> %.2f seconds (%,d rows)", seconds, count)); + } + return count; + } + + protected void commitIfNecessary(Connection con) throws SQLException + { + if (!con.getAutoCommit()) { + con.commit(); + } + } + + protected SQLException safeRollback(Connection con, SQLException cause) + { + try { + if (!con.getAutoCommit()) { + con.rollback(); + } + return cause; + } catch (SQLException ex) { + if (cause != null) { + cause.addSuppressed(ex); + return cause; + } + return ex; + } + } +}