src/main/java/org/embulk/output/oracle/OracleOutputConnection.java in embulk-output-oracle-0.2.1 vs src/main/java/org/embulk/output/oracle/OracleOutputConnection.java in embulk-output-oracle-0.2.2

- old
+ new

@@ -1,24 +1,44 @@ package org.embulk.output.oracle; +import java.nio.charset.Charset; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; import org.embulk.output.jdbc.JdbcOutputConnection; import org.embulk.output.jdbc.JdbcSchema; public class OracleOutputConnection extends JdbcOutputConnection { - public OracleOutputConnection(Connection connection, boolean autoCommit) + private static final Map<String, String> CHARSET_NAMES = new HashMap<String, String>(); + static { + CHARSET_NAMES.put("JA16SJIS", "Shift_JIS"); + CHARSET_NAMES.put("JA16SJISTILDE", "Shift_JIS"); + CHARSET_NAMES.put("JA16EUC", "EUC-JP"); + CHARSET_NAMES.put("JA16EUCTILDE", "EUC-JP"); + CHARSET_NAMES.put("AL32UTF8", "UTF-8"); + CHARSET_NAMES.put("UTF8", "UTF-8"); + CHARSET_NAMES.put("AL16UTF16", "UTF-16"); + } + + private final boolean direct; + + + public OracleOutputConnection(Connection connection, boolean autoCommit, boolean direct) throws SQLException { super(connection, getSchema(connection)); connection.setAutoCommit(autoCommit); + + this.direct = direct; } @Override protected String convertTypeName(String typeName) { @@ -70,6 +90,45 @@ } throw new SQLException(String.format("Cannot get schema becase \"%s\" didn't return any value.", sql)); } } } + + @Override + protected String buildPrepareInsertSql(String toTable, JdbcSchema toTableSchema) throws SQLException + { + String sql = super.buildPrepareInsertSql(toTable, toTableSchema); + if (direct) { + sql = sql.replaceAll("^INSERT ", "INSERT /*+ APPEND_VALUES */ "); + } + return sql; + } + + public OracleCharset getCharset() throws SQLException + { + String charsetName = "UTF8"; + try (Statement statement = connection.createStatement()) { + try (ResultSet resultSet = statement.executeQuery("SELECT VALUE FROM NLS_DATABASE_PARAMETERS WHERE PARAMETER='NLS_CHARACTERSET'")) { + if (resultSet.next()) { + String nlsCharacterSet = resultSet.getString(1); + if (CHARSET_NAMES.containsKey(nlsCharacterSet)) { + charsetName = nlsCharacterSet; + } + } + } + } + + try (PreparedStatement statement = connection.prepareStatement("SELECT NLS_CHARSET_ID(?) FROM DUAL")) { + statement.setString(1, charsetName); + try (ResultSet resultSet = statement.executeQuery()) { + if (!resultSet.next()) { + throw new SQLException("Unknown NLS_CHARACTERSET : " + charsetName); + } + + return new OracleCharset(charsetName, + resultSet.getShort(1), + Charset.forName(CHARSET_NAMES.get(charsetName))); + } + } + } + }