src/main/java/org/embulk/output/SQLServerOutputPlugin.java in embulk-output-sqlserver-0.6.3 vs src/main/java/org/embulk/output/SQLServerOutputPlugin.java in embulk-output-sqlserver-0.6.4
- old
+ new
@@ -1,14 +1,12 @@
package org.embulk.output;
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.List;
-import java.util.Properties;
-
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
+import org.embulk.config.ConfigException;
import org.embulk.output.jdbc.AbstractJdbcOutputPlugin;
import org.embulk.output.jdbc.BatchInsert;
import org.embulk.output.jdbc.MergeConfig;
import org.embulk.output.jdbc.StandardBatchInsert;
import org.embulk.output.jdbc.setter.ColumnSetterFactory;
@@ -16,16 +14,21 @@
import org.embulk.output.sqlserver.NativeBatchInsert;
import org.embulk.output.sqlserver.SQLServerOutputConnector;
import org.embulk.output.sqlserver.setter.SQLServerColumnSetterFactory;
import org.joda.time.DateTimeZone;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Properties;
+import static java.util.Locale.ENGLISH;
+
public class SQLServerOutputPlugin
extends AbstractJdbcOutputPlugin
{
+ private static int DEFAULT_PORT = 1433;
+
public interface SQLServerPluginTask
extends PluginTask
{
@Config("driver_path")
@ConfigDefault("null")
@@ -66,10 +69,31 @@
@Config("insert_method")
@ConfigDefault("\"normal\"")
public InsertMethod getInsertMethod();
}
+ private static class UrlAndProperties {
+ private final String url;
+ private final Properties props;
+
+ public UrlAndProperties(String url, Properties props)
+ {
+ this.url = url;
+ this.props = props;
+ }
+
+ public String getUrl()
+ {
+ return this.url;
+ }
+
+ public Properties getProps()
+ {
+ return this.props;
+ }
+ }
+
@Override
protected Class<? extends PluginTask> getTaskClass()
{
return SQLServerPluginTask.class;
}
@@ -85,16 +109,53 @@
@Override
protected SQLServerOutputConnector getConnector(PluginTask task, boolean retryableMetadataOperation)
{
SQLServerPluginTask sqlServerTask = (SQLServerPluginTask) task;
+ boolean useJtdsDriver = false;
if (sqlServerTask.getDriverPath().isPresent()) {
loadDriverJar(sqlServerTask.getDriverPath().get());
+ try {
+ Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
+ } catch(Exception e) {
+ throw new ConfigException("Driver set at field 'driver_path' doesn't include Microsoft SQLServerDriver", e);
+ }
+ } else {
+ // prefer Microsoft SQLServerDriver if it is in classpath
+ try {
+ Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
+ } catch(Exception e) {
+ logger.info("Using jTDS Driver");
+ try {
+ Class.forName("net.sourceforge.jtds.jdbc.Driver");
+ } catch(Exception e2) {
+ throw new ConfigException("'driver_path' doesn't set and can't found jTDS driver", e2);
+
+ }
+ useJtdsDriver = true;
+ }
}
+ UrlAndProperties urlProps = getUrlAndProperties(sqlServerTask, useJtdsDriver);
+ logger.info("Connecting to {} options {}", urlProps.getUrl(), getPropsWithMaskedSecret(urlProps));
+ return new SQLServerOutputConnector(urlProps.getUrl(), urlProps.getProps(), null);
+ }
+
+ private UrlAndProperties getUrlAndProperties(SQLServerPluginTask sqlServerTask, boolean useJtdsDriver)
+ {
+ Properties props = new Properties();
String url;
+
+ props.putAll(sqlServerTask.getOptions());
+ if (sqlServerTask.getUser().isPresent()) {
+ props.setProperty("user", sqlServerTask.getUser().get());
+ }
+ if (sqlServerTask.getPassword().isPresent()) {
+ props.setProperty("password", sqlServerTask.getPassword().get());
+ }
+
if (sqlServerTask.getUrl().isPresent()) {
if (sqlServerTask.getInsertMethod() == InsertMethod.NATIVE) {
throw new IllegalArgumentException("Cannot set 'url' when 'insert_method' is 'native'.");
}
@@ -110,10 +171,40 @@
throw new IllegalArgumentException("Field 'host' is not set.");
}
if (!sqlServerTask.getDatabase().isPresent()) {
throw new IllegalArgumentException("Field 'database' is not set.");
}
+ }
+
+ if(useJtdsDriver) {
+ // jTDS URL: host:port[/database] or host[/database][;instance=]
+ // host:port;instance= is allowed but port will be ignored? in this case.
+ if (sqlServerTask.getInstance().isPresent()) {
+ if (sqlServerTask.getPort() != DEFAULT_PORT) {
+ logger.warn("'port: {}' option is ignored because instance option is set", sqlServerTask.getPort());
+ }
+ url = String.format(ENGLISH, "jdbc:jtds:sqlserver://%s", sqlServerTask.getHost().get());
+ props.setProperty("instance", sqlServerTask.getInstance().get());
+ }
+ else {
+ url = String.format(ENGLISH, "jdbc:jtds:sqlserver://%s:%d", sqlServerTask.getHost().get(), sqlServerTask.getPort());
+ }
+
+ // /database
+ if (sqlServerTask.getDatabase().isPresent()) {
+ url += "/" + sqlServerTask.getDatabase().get();
+ }
+
+ // integratedSecutiry is not supported, user + password is required
+ if (sqlServerTask.getIntegratedSecurity().isPresent()) {
+ throw new ConfigException("'integratedSecutiry' option is not supported with jTDS driver. Set 'driver_path: /path/to/sqljdbc.jar' option if you want to use Microsoft SQLServerDriver.");
+ }
+
+ if (!sqlServerTask.getUser().isPresent()) {
+ throw new ConfigException("'user' option is required but not set.");
+ }
+ }else {
StringBuilder urlBuilder = new StringBuilder();
if (sqlServerTask.getInstance().isPresent()) {
urlBuilder.append(String.format("jdbc:sqlserver://%s\\%s",
sqlServerTask.getHost().get(), sqlServerTask.getInstance().get()));
} else {
@@ -134,25 +225,14 @@
}
}
url = urlBuilder.toString();
}
-
- Properties props = new Properties();
- props.putAll(sqlServerTask.getOptions());
-
- if (sqlServerTask.getUser().isPresent()) {
- props.setProperty("user", sqlServerTask.getUser().get());
- }
- logger.info("Connecting to {} options {}", url, props);
- if (sqlServerTask.getPassword().isPresent()) {
- props.setProperty("password", sqlServerTask.getPassword().get());
- }
-
- return new SQLServerOutputConnector(url, props, null);
+ return new UrlAndProperties(url, props);
}
+
@Override
protected BatchInsert newBatchInsert(PluginTask task, Optional<MergeConfig> mergeConfig) throws IOException, SQLException
{
SQLServerPluginTask sqlServerTask = (SQLServerPluginTask) task;
if (sqlServerTask.getInsertMethod() == InsertMethod.NATIVE) {
@@ -164,7 +244,21 @@
@Override
protected ColumnSetterFactory newColumnSetterFactory(BatchInsert batch, DateTimeZone defaultTimeZone)
{
return new SQLServerColumnSetterFactory(batch, defaultTimeZone);
+ }
+
+ private Properties getPropsWithMaskedSecret(UrlAndProperties urlAndProperties)
+ {
+ Properties safeProps = new Properties();
+ Properties originalProps = urlAndProperties.getProps();
+ for(String key : originalProps.stringPropertyNames()) {
+ if (key.equals("password")) {
+ safeProps.setProperty(key, "***");
+ } else {
+ safeProps.setProperty(key, originalProps.getProperty(key));
+ }
+ }
+ return safeProps;
}
}