src/main/java/org/embulk/output/MySQLOutputPlugin.java in embulk-output-mysql-0.2.4 vs src/main/java/org/embulk/output/MySQLOutputPlugin.java in embulk-output-mysql-0.3.0

- old
+ new

@@ -1,12 +1,12 @@ package org.embulk.output; +import java.util.List; import java.util.Properties; import java.io.IOException; import java.sql.SQLException; - -import org.embulk.output.mysql.MySQLBatchUpsert; +import com.google.common.base.Optional; import org.embulk.config.Config; import org.embulk.config.ConfigDefault; import org.embulk.output.jdbc.AbstractJdbcOutputPlugin; import org.embulk.output.jdbc.BatchInsert; import org.embulk.output.mysql.MySQLOutputConnector; @@ -41,20 +41,26 @@ { return MySQLPluginTask.class; } @Override + protected Features getFeatures(PluginTask task) + { + return new Features() + .setMaxTableNameLength(64) + .setIgnoreMergeKeys(true); + } + + @Override protected MySQLOutputConnector getConnector(PluginTask task, boolean retryableMetadataOperation) { MySQLPluginTask t = (MySQLPluginTask) task; String url = String.format("jdbc:mysql://%s:%d/%s", t.getHost(), t.getPort(), t.getDatabase()); Properties props = new Properties(); - props.setProperty("user", t.getUser()); - props.setProperty("password", t.getPassword()); props.setProperty("rewriteBatchedStatements", "true"); props.setProperty("useCompression", "true"); props.setProperty("connectTimeout", "300000"); // milliseconds @@ -86,15 +92,20 @@ props.setProperty("socketTimeout", "2700000"); // milliseconds } props.putAll(t.getOptions()); + // TODO validate task.getMergeKeys is null + + props.setProperty("user", t.getUser()); + logger.info("Connecting to {} options {}", url, props); + props.setProperty("password", t.getPassword()); + return new MySQLOutputConnector(url, props); } @Override - protected BatchInsert newBatchInsert(PluginTask task) throws IOException, SQLException + protected BatchInsert newBatchInsert(PluginTask task, Optional<List<String>> mergeKeys) throws IOException, SQLException { - MySQLOutputConnector connector = getConnector(task, true); - return task.getMode().isMerge() ? new MySQLBatchUpsert(connector) : new MySQLBatchInsert(connector); + return new MySQLBatchInsert(getConnector(task, true), mergeKeys); } }