package org.embulk.output; import com.google.common.base.Optional; import com.google.common.collect.ImmutableSet; import org.embulk.config.Config; import org.embulk.config.ConfigDefault; import org.embulk.config.ConfigException; import org.embulk.output.jdbc.AbstractJdbcOutputPlugin; import org.embulk.output.jdbc.BatchInsert; import org.embulk.output.jdbc.MergeConfig; import org.embulk.output.jdbc.StandardBatchInsert; import org.embulk.output.jdbc.setter.ColumnSetterFactory; import org.embulk.output.sqlserver.InsertMethod; import org.embulk.output.sqlserver.NativeBatchInsert; import org.embulk.output.sqlserver.SQLServerOutputConnector; import org.embulk.output.sqlserver.setter.SQLServerColumnSetterFactory; import org.joda.time.DateTimeZone; import java.io.IOException; import java.sql.SQLException; import java.util.Properties; import static java.util.Locale.ENGLISH; public class SQLServerOutputPlugin extends AbstractJdbcOutputPlugin { private static int DEFAULT_PORT = 1433; public interface SQLServerPluginTask extends PluginTask { @Config("driver_path") @ConfigDefault("null") public Optional getDriverPath(); @Config("host") @ConfigDefault("null") public Optional getHost(); @Config("port") @ConfigDefault("1433") public int getPort(); @Config("instance") @ConfigDefault("null") public Optional getInstance(); @Config("database") @ConfigDefault("null") public Optional getDatabase(); @Config("integratedSecurity") @ConfigDefault("null") public Optional getIntegratedSecurity(); @Config("url") @ConfigDefault("null") public Optional getUrl(); @Config("user") @ConfigDefault("null") public Optional getUser(); @Config("password") @ConfigDefault("\"\"") public Optional getPassword(); @Config("insert_method") @ConfigDefault("\"normal\"") public InsertMethod getInsertMethod(); } private static class UrlAndProperties { private final String url; private final Properties props; public UrlAndProperties(String url, Properties props) { this.url = url; this.props = props; } public String getUrl() { return this.url; } public Properties getProps() { return this.props; } } @Override protected Class getTaskClass() { return SQLServerPluginTask.class; } @Override protected Features getFeatures(PluginTask task) { return new Features() .setMaxTableNameLength(128) .setSupportedModes(ImmutableSet.of(Mode.INSERT, Mode.INSERT_DIRECT, Mode.MERGE, Mode.TRUNCATE_INSERT, Mode.REPLACE)) .setIgnoreMergeKeys(false); } @Override protected SQLServerOutputConnector getConnector(PluginTask task, boolean retryableMetadataOperation) { SQLServerPluginTask sqlServerTask = (SQLServerPluginTask) task; boolean useJtdsDriver = false; if (sqlServerTask.getDriverPath().isPresent()) { loadDriverJar(sqlServerTask.getDriverPath().get()); try { Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver"); } catch(Exception e) { throw new ConfigException("Driver set at field 'driver_path' doesn't include Microsoft SQLServerDriver", e); } } else { // prefer Microsoft SQLServerDriver if it is in classpath try { Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver"); } catch(Exception e) { logger.info("Using jTDS Driver"); try { Class.forName("net.sourceforge.jtds.jdbc.Driver"); } catch(Exception e2) { throw new ConfigException("'driver_path' doesn't set and can't found jTDS driver", e2); } useJtdsDriver = true; } } UrlAndProperties urlProps = getUrlAndProperties(sqlServerTask, useJtdsDriver); logger.info("Connecting to {} options {}", urlProps.getUrl(), getPropsWithMaskedSecret(urlProps)); return new SQLServerOutputConnector(urlProps.getUrl(), urlProps.getProps(), null); } private UrlAndProperties getUrlAndProperties(SQLServerPluginTask sqlServerTask, boolean useJtdsDriver) { Properties props = new Properties(); String url; props.putAll(sqlServerTask.getOptions()); if (sqlServerTask.getUser().isPresent()) { props.setProperty("user", sqlServerTask.getUser().get()); } if (sqlServerTask.getPassword().isPresent()) { props.setProperty("password", sqlServerTask.getPassword().get()); } if (sqlServerTask.getUrl().isPresent()) { if (sqlServerTask.getInsertMethod() == InsertMethod.NATIVE) { throw new IllegalArgumentException("Cannot set 'url' when 'insert_method' is 'native'."); } if (sqlServerTask.getHost().isPresent() || sqlServerTask.getInstance().isPresent() || sqlServerTask.getDatabase().isPresent() || sqlServerTask.getIntegratedSecurity().isPresent()) { throw new IllegalArgumentException("'host', 'port', 'instance', 'database' and 'integratedSecurity' parameters are invalid if 'url' parameter is set."); } url = sqlServerTask.getUrl().get(); } else { if (!sqlServerTask.getHost().isPresent()) { throw new IllegalArgumentException("Field 'host' is not set."); } if (!sqlServerTask.getDatabase().isPresent()) { throw new IllegalArgumentException("Field 'database' is not set."); } } if(useJtdsDriver) { // jTDS URL: host:port[/database] or host[/database][;instance=] // host:port;instance= is allowed but port will be ignored? in this case. if (sqlServerTask.getInstance().isPresent()) { if (sqlServerTask.getPort() != DEFAULT_PORT) { logger.warn("'port: {}' option is ignored because instance option is set", sqlServerTask.getPort()); } url = String.format(ENGLISH, "jdbc:jtds:sqlserver://%s", sqlServerTask.getHost().get()); props.setProperty("instance", sqlServerTask.getInstance().get()); } else { url = String.format(ENGLISH, "jdbc:jtds:sqlserver://%s:%d", sqlServerTask.getHost().get(), sqlServerTask.getPort()); } // /database if (sqlServerTask.getDatabase().isPresent()) { url += "/" + sqlServerTask.getDatabase().get(); } // integratedSecutiry is not supported, user + password is required if (sqlServerTask.getIntegratedSecurity().isPresent()) { throw new ConfigException("'integratedSecutiry' option is not supported with jTDS driver. Set 'driver_path: /path/to/sqljdbc.jar' option if you want to use Microsoft SQLServerDriver."); } if (!sqlServerTask.getUser().isPresent()) { throw new ConfigException("'user' option is required but not set."); } }else { StringBuilder urlBuilder = new StringBuilder(); if (sqlServerTask.getInstance().isPresent()) { urlBuilder.append(String.format("jdbc:sqlserver://%s\\%s", sqlServerTask.getHost().get(), sqlServerTask.getInstance().get())); } else { urlBuilder.append(String.format("jdbc:sqlserver://%s:%d", sqlServerTask.getHost().get(), sqlServerTask.getPort())); } if (sqlServerTask.getDatabase().isPresent()) { urlBuilder.append(";databaseName=" + sqlServerTask.getDatabase().get()); } if (sqlServerTask.getIntegratedSecurity().isPresent() && sqlServerTask.getIntegratedSecurity().get()) { urlBuilder.append(";integratedSecurity=" + sqlServerTask.getIntegratedSecurity().get()); } else { if (!sqlServerTask.getUser().isPresent()) { throw new IllegalArgumentException("Field 'user' is not set."); } if (!sqlServerTask.getPassword().isPresent()) { throw new IllegalArgumentException("Field 'password' is not set."); } } url = urlBuilder.toString(); } return new UrlAndProperties(url, props); } @Override protected BatchInsert newBatchInsert(PluginTask task, Optional mergeConfig) throws IOException, SQLException { SQLServerPluginTask sqlServerTask = (SQLServerPluginTask) task; if (sqlServerTask.getInsertMethod() == InsertMethod.NATIVE) { return new NativeBatchInsert(sqlServerTask.getHost().get(), sqlServerTask.getPort(), sqlServerTask.getInstance(), sqlServerTask.getDatabase().get(), sqlServerTask.getUser(), sqlServerTask.getPassword()); } return new StandardBatchInsert(getConnector(task, true), mergeConfig); } @Override protected ColumnSetterFactory newColumnSetterFactory(BatchInsert batch, DateTimeZone defaultTimeZone) { return new SQLServerColumnSetterFactory(batch, defaultTimeZone); } private Properties getPropsWithMaskedSecret(UrlAndProperties urlAndProperties) { Properties safeProps = new Properties(); Properties originalProps = urlAndProperties.getProps(); for(String key : originalProps.stringPropertyNames()) { if (key.equals("password")) { safeProps.setProperty(key, "***"); } else { safeProps.setProperty(key, originalProps.getProperty(key)); } } return safeProps; } }