src/main/java/org/embulk/output/redshift/RedshiftOutputConnection.java in embulk-output-redshift-0.4.0 vs src/main/java/org/embulk/output/redshift/RedshiftOutputConnection.java in embulk-output-redshift-0.4.1
- old
+ new
@@ -1,122 +1,122 @@
-package org.embulk.output.redshift;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import org.slf4j.Logger;
-import org.embulk.spi.Exec;
-import org.embulk.output.jdbc.JdbcOutputConnection;
-import org.embulk.output.jdbc.JdbcColumn;
-import org.embulk.output.jdbc.JdbcSchema;
-
-public class RedshiftOutputConnection
- extends JdbcOutputConnection
-{
- private final Logger logger = Exec.getLogger(RedshiftOutputConnection.class);
-
- public RedshiftOutputConnection(Connection connection, String schemaName, boolean autoCommit)
- throws SQLException
- {
- super(connection, schemaName);
- connection.setAutoCommit(autoCommit);
- }
-
- // Redshift does not support DROP TABLE IF EXISTS.
- // Here runs DROP TABLE and ignores errors.
- @Override
- public void dropTableIfExists(String tableName) throws SQLException
- {
- Statement stmt = connection.createStatement();
- try {
- String sql = String.format("DROP TABLE IF EXISTS %s", quoteIdentifierString(tableName));
- executeUpdate(stmt, sql);
- commitIfNecessary(connection);
- } catch (SQLException ex) {
- // ignore errors.
- // TODO here should ignore only 'table "XXX" does not exist' errors.
- SQLException ignored = safeRollback(connection, ex);
- } finally {
- stmt.close();
- }
- }
-
- // Redshift does not support DROP TABLE IF EXISTS.
- // Dropping part runs DROP TABLE and ignores errors.
- @Override
- public void replaceTable(String fromTable, JdbcSchema schema, String toTable) throws SQLException
- {
- Statement stmt = connection.createStatement();
- try {
- try {
- StringBuilder sb = new StringBuilder();
- sb.append("DROP TABLE ");
- quoteIdentifierString(sb, toTable);
- String sql = sb.toString();
- executeUpdate(stmt, sql);
- } catch (SQLException ex) {
- // ignore errors.
- // TODO here should ignore only 'table "XXX" does not exist' errors.
- // rollback or comimt is required to recover failed transaction
- SQLException ignored = safeRollback(connection, ex);
- }
-
- {
- StringBuilder sb = new StringBuilder();
- sb.append("ALTER TABLE ");
- quoteIdentifierString(sb, fromTable);
- sb.append(" RENAME TO ");
- quoteIdentifierString(sb, toTable);
- String sql = sb.toString();
- executeUpdate(stmt, sql);
- }
-
- commitIfNecessary(connection);
- } catch (SQLException ex) {
- throw safeRollback(connection, ex);
- } finally {
- stmt.close();
- }
- }
-
- @Override
- protected String buildColumnTypeName(JdbcColumn c)
- {
- // Redshift does not support TEXT type.
- switch(c.getSimpleTypeName()) {
- case "CLOB":
- return "VARCHAR(65535)";
- case "TEXT":
- return "VARCHAR(65535)";
- case "BLOB":
- return "BYTEA";
- default:
- return super.buildColumnTypeName(c);
- }
- }
-
- public String buildCopySQLBeforeFrom(String tableName, JdbcSchema tableSchema)
- {
- StringBuilder sb = new StringBuilder();
-
- sb.append("COPY ");
- quoteIdentifierString(sb, tableName);
- sb.append(" (");
- for(int i=0; i < tableSchema.getCount(); i++) {
- if(i != 0) { sb.append(", "); }
- quoteIdentifierString(sb, tableSchema.getColumnName(i));
- }
- sb.append(")");
-
- return sb.toString();
- }
-
- public void runCopy(String sql) throws SQLException
- {
- Statement stmt = connection.createStatement();
- try {
- stmt.executeUpdate(sql);
- } finally {
- stmt.close();
- }
- }
-}
+package org.embulk.output.redshift;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.slf4j.Logger;
+import org.embulk.spi.Exec;
+import org.embulk.output.jdbc.JdbcOutputConnection;
+import org.embulk.output.jdbc.JdbcColumn;
+import org.embulk.output.jdbc.JdbcSchema;
+
+public class RedshiftOutputConnection
+ extends JdbcOutputConnection
+{
+ private final Logger logger = Exec.getLogger(RedshiftOutputConnection.class);
+
+ public RedshiftOutputConnection(Connection connection, String schemaName, boolean autoCommit)
+ throws SQLException
+ {
+ super(connection, schemaName);
+ connection.setAutoCommit(autoCommit);
+ }
+
+ // Redshift does not support DROP TABLE IF EXISTS.
+ // Here runs DROP TABLE and ignores errors.
+ @Override
+ public void dropTableIfExists(String tableName) throws SQLException
+ {
+ Statement stmt = connection.createStatement();
+ try {
+ String sql = String.format("DROP TABLE IF EXISTS %s", quoteIdentifierString(tableName));
+ executeUpdate(stmt, sql);
+ commitIfNecessary(connection);
+ } catch (SQLException ex) {
+ // ignore errors.
+ // TODO here should ignore only 'table "XXX" does not exist' errors.
+ SQLException ignored = safeRollback(connection, ex);
+ } finally {
+ stmt.close();
+ }
+ }
+
+ // Redshift does not support DROP TABLE IF EXISTS.
+ // Dropping part runs DROP TABLE and ignores errors.
+ @Override
+ public void replaceTable(String fromTable, JdbcSchema schema, String toTable) throws SQLException
+ {
+ Statement stmt = connection.createStatement();
+ try {
+ try {
+ StringBuilder sb = new StringBuilder();
+ sb.append("DROP TABLE ");
+ quoteIdentifierString(sb, toTable);
+ String sql = sb.toString();
+ executeUpdate(stmt, sql);
+ } catch (SQLException ex) {
+ // ignore errors.
+ // TODO here should ignore only 'table "XXX" does not exist' errors.
+ // rollback or comimt is required to recover failed transaction
+ SQLException ignored = safeRollback(connection, ex);
+ }
+
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("ALTER TABLE ");
+ quoteIdentifierString(sb, fromTable);
+ sb.append(" RENAME TO ");
+ quoteIdentifierString(sb, toTable);
+ String sql = sb.toString();
+ executeUpdate(stmt, sql);
+ }
+
+ commitIfNecessary(connection);
+ } catch (SQLException ex) {
+ throw safeRollback(connection, ex);
+ } finally {
+ stmt.close();
+ }
+ }
+
+ @Override
+ protected String buildColumnTypeName(JdbcColumn c)
+ {
+ // Redshift does not support TEXT type.
+ switch(c.getSimpleTypeName()) {
+ case "CLOB":
+ return "VARCHAR(65535)";
+ case "TEXT":
+ return "VARCHAR(65535)";
+ case "BLOB":
+ return "BYTEA";
+ default:
+ return super.buildColumnTypeName(c);
+ }
+ }
+
+ public String buildCopySQLBeforeFrom(String tableName, JdbcSchema tableSchema)
+ {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("COPY ");
+ quoteIdentifierString(sb, tableName);
+ sb.append(" (");
+ for(int i=0; i < tableSchema.getCount(); i++) {
+ if(i != 0) { sb.append(", "); }
+ quoteIdentifierString(sb, tableSchema.getColumnName(i));
+ }
+ sb.append(")");
+
+ return sb.toString();
+ }
+
+ public void runCopy(String sql) throws SQLException
+ {
+ Statement stmt = connection.createStatement();
+ try {
+ stmt.executeUpdate(sql);
+ } finally {
+ stmt.close();
+ }
+ }
+}