src/main/java/org/embulk/output/SQLServerOutputPlugin.java in embulk-output-sqlserver-0.6.3 vs src/main/java/org/embulk/output/SQLServerOutputPlugin.java in embulk-output-sqlserver-0.6.4

- old
+ new

@@ -1,14 +1,12 @@ package org.embulk.output; -import java.io.IOException; -import java.sql.SQLException; -import java.util.List; -import java.util.Properties; - +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableSet; import org.embulk.config.Config; import org.embulk.config.ConfigDefault; +import org.embulk.config.ConfigException; import org.embulk.output.jdbc.AbstractJdbcOutputPlugin; import org.embulk.output.jdbc.BatchInsert; import org.embulk.output.jdbc.MergeConfig; import org.embulk.output.jdbc.StandardBatchInsert; import org.embulk.output.jdbc.setter.ColumnSetterFactory; @@ -16,16 +14,21 @@ import org.embulk.output.sqlserver.NativeBatchInsert; import org.embulk.output.sqlserver.SQLServerOutputConnector; import org.embulk.output.sqlserver.setter.SQLServerColumnSetterFactory; import org.joda.time.DateTimeZone; -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableSet; +import java.io.IOException; +import java.sql.SQLException; +import java.util.Properties; +import static java.util.Locale.ENGLISH; + public class SQLServerOutputPlugin extends AbstractJdbcOutputPlugin { + private static int DEFAULT_PORT = 1433; + public interface SQLServerPluginTask extends PluginTask { @Config("driver_path") @ConfigDefault("null") @@ -66,10 +69,31 @@ @Config("insert_method") @ConfigDefault("\"normal\"") public InsertMethod getInsertMethod(); } + private static class UrlAndProperties { + private final String url; + private final Properties props; + + public UrlAndProperties(String url, Properties props) + { + this.url = url; + this.props = props; + } + + public String getUrl() + { + return this.url; + } + + public Properties getProps() + { + return this.props; + } + } + @Override protected Class<? extends PluginTask> getTaskClass() { return SQLServerPluginTask.class; } @@ -85,16 +109,53 @@ @Override protected SQLServerOutputConnector getConnector(PluginTask task, boolean retryableMetadataOperation) { SQLServerPluginTask sqlServerTask = (SQLServerPluginTask) task; + boolean useJtdsDriver = false; if (sqlServerTask.getDriverPath().isPresent()) { loadDriverJar(sqlServerTask.getDriverPath().get()); + try { + Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver"); + } catch(Exception e) { + throw new ConfigException("Driver set at field 'driver_path' doesn't include Microsoft SQLServerDriver", e); + } + } else { + // prefer Microsoft SQLServerDriver if it is in classpath + try { + Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver"); + } catch(Exception e) { + logger.info("Using jTDS Driver"); + try { + Class.forName("net.sourceforge.jtds.jdbc.Driver"); + } catch(Exception e2) { + throw new ConfigException("'driver_path' doesn't set and can't found jTDS driver", e2); + + } + useJtdsDriver = true; + } } + UrlAndProperties urlProps = getUrlAndProperties(sqlServerTask, useJtdsDriver); + logger.info("Connecting to {} options {}", urlProps.getUrl(), getPropsWithMaskedSecret(urlProps)); + return new SQLServerOutputConnector(urlProps.getUrl(), urlProps.getProps(), null); + } + + private UrlAndProperties getUrlAndProperties(SQLServerPluginTask sqlServerTask, boolean useJtdsDriver) + { + Properties props = new Properties(); String url; + + props.putAll(sqlServerTask.getOptions()); + if (sqlServerTask.getUser().isPresent()) { + props.setProperty("user", sqlServerTask.getUser().get()); + } + if (sqlServerTask.getPassword().isPresent()) { + props.setProperty("password", sqlServerTask.getPassword().get()); + } + if (sqlServerTask.getUrl().isPresent()) { if (sqlServerTask.getInsertMethod() == InsertMethod.NATIVE) { throw new IllegalArgumentException("Cannot set 'url' when 'insert_method' is 'native'."); } @@ -110,10 +171,40 @@ throw new IllegalArgumentException("Field 'host' is not set."); } if (!sqlServerTask.getDatabase().isPresent()) { throw new IllegalArgumentException("Field 'database' is not set."); } + } + + if(useJtdsDriver) { + // jTDS URL: host:port[/database] or host[/database][;instance=] + // host:port;instance= is allowed but port will be ignored? in this case. + if (sqlServerTask.getInstance().isPresent()) { + if (sqlServerTask.getPort() != DEFAULT_PORT) { + logger.warn("'port: {}' option is ignored because instance option is set", sqlServerTask.getPort()); + } + url = String.format(ENGLISH, "jdbc:jtds:sqlserver://%s", sqlServerTask.getHost().get()); + props.setProperty("instance", sqlServerTask.getInstance().get()); + } + else { + url = String.format(ENGLISH, "jdbc:jtds:sqlserver://%s:%d", sqlServerTask.getHost().get(), sqlServerTask.getPort()); + } + + // /database + if (sqlServerTask.getDatabase().isPresent()) { + url += "/" + sqlServerTask.getDatabase().get(); + } + + // integratedSecutiry is not supported, user + password is required + if (sqlServerTask.getIntegratedSecurity().isPresent()) { + throw new ConfigException("'integratedSecutiry' option is not supported with jTDS driver. Set 'driver_path: /path/to/sqljdbc.jar' option if you want to use Microsoft SQLServerDriver."); + } + + if (!sqlServerTask.getUser().isPresent()) { + throw new ConfigException("'user' option is required but not set."); + } + }else { StringBuilder urlBuilder = new StringBuilder(); if (sqlServerTask.getInstance().isPresent()) { urlBuilder.append(String.format("jdbc:sqlserver://%s\\%s", sqlServerTask.getHost().get(), sqlServerTask.getInstance().get())); } else { @@ -134,25 +225,14 @@ } } url = urlBuilder.toString(); } - - Properties props = new Properties(); - props.putAll(sqlServerTask.getOptions()); - - if (sqlServerTask.getUser().isPresent()) { - props.setProperty("user", sqlServerTask.getUser().get()); - } - logger.info("Connecting to {} options {}", url, props); - if (sqlServerTask.getPassword().isPresent()) { - props.setProperty("password", sqlServerTask.getPassword().get()); - } - - return new SQLServerOutputConnector(url, props, null); + return new UrlAndProperties(url, props); } + @Override protected BatchInsert newBatchInsert(PluginTask task, Optional<MergeConfig> mergeConfig) throws IOException, SQLException { SQLServerPluginTask sqlServerTask = (SQLServerPluginTask) task; if (sqlServerTask.getInsertMethod() == InsertMethod.NATIVE) { @@ -164,7 +244,21 @@ @Override protected ColumnSetterFactory newColumnSetterFactory(BatchInsert batch, DateTimeZone defaultTimeZone) { return new SQLServerColumnSetterFactory(batch, defaultTimeZone); + } + + private Properties getPropsWithMaskedSecret(UrlAndProperties urlAndProperties) + { + Properties safeProps = new Properties(); + Properties originalProps = urlAndProperties.getProps(); + for(String key : originalProps.stringPropertyNames()) { + if (key.equals("password")) { + safeProps.setProperty(key, "***"); + } else { + safeProps.setProperty(key, originalProps.getProperty(key)); + } + } + return safeProps; } }