src/main/java/org/embulk/output/MySQLOutputPlugin.java in embulk-output-mysql-0.2.4 vs src/main/java/org/embulk/output/MySQLOutputPlugin.java in embulk-output-mysql-0.3.0
- old
+ new
@@ -1,12 +1,12 @@
package org.embulk.output;
+import java.util.List;
import java.util.Properties;
import java.io.IOException;
import java.sql.SQLException;
-
-import org.embulk.output.mysql.MySQLBatchUpsert;
+import com.google.common.base.Optional;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.output.jdbc.AbstractJdbcOutputPlugin;
import org.embulk.output.jdbc.BatchInsert;
import org.embulk.output.mysql.MySQLOutputConnector;
@@ -41,20 +41,26 @@
{
return MySQLPluginTask.class;
}
@Override
+ protected Features getFeatures(PluginTask task)
+ {
+ return new Features()
+ .setMaxTableNameLength(64)
+ .setIgnoreMergeKeys(true);
+ }
+
+ @Override
protected MySQLOutputConnector getConnector(PluginTask task, boolean retryableMetadataOperation)
{
MySQLPluginTask t = (MySQLPluginTask) task;
String url = String.format("jdbc:mysql://%s:%d/%s",
t.getHost(), t.getPort(), t.getDatabase());
Properties props = new Properties();
- props.setProperty("user", t.getUser());
- props.setProperty("password", t.getPassword());
props.setProperty("rewriteBatchedStatements", "true");
props.setProperty("useCompression", "true");
props.setProperty("connectTimeout", "300000"); // milliseconds
@@ -86,15 +92,20 @@
props.setProperty("socketTimeout", "2700000"); // milliseconds
}
props.putAll(t.getOptions());
+ // TODO validate task.getMergeKeys is null
+
+ props.setProperty("user", t.getUser());
+ logger.info("Connecting to {} options {}", url, props);
+ props.setProperty("password", t.getPassword());
+
return new MySQLOutputConnector(url, props);
}
@Override
- protected BatchInsert newBatchInsert(PluginTask task) throws IOException, SQLException
+ protected BatchInsert newBatchInsert(PluginTask task, Optional<List<String>> mergeKeys) throws IOException, SQLException
{
- MySQLOutputConnector connector = getConnector(task, true);
- return task.getMode().isMerge() ? new MySQLBatchUpsert(connector) : new MySQLBatchInsert(connector);
+ return new MySQLBatchInsert(getConnector(task, true), mergeKeys);
}
}