src/main/java/org/embulk/output/MySQLOutputPlugin.java in embulk-output-mysql-0.7.8 vs src/main/java/org/embulk/output/MySQLOutputPlugin.java in embulk-output-mysql-0.7.9
- old
+ new
@@ -1,18 +1,24 @@
package org.embulk.output;
import java.util.Properties;
import java.io.IOException;
import java.sql.SQLException;
+
import com.google.common.base.Optional;
+
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.output.jdbc.AbstractJdbcOutputPlugin;
import org.embulk.output.jdbc.BatchInsert;
+import org.embulk.output.jdbc.JdbcOutputConnection;
import org.embulk.output.jdbc.MergeConfig;
+import org.embulk.output.jdbc.TableIdentifier;
+import org.embulk.output.mysql.MySQLOutputConnection;
import org.embulk.output.mysql.MySQLOutputConnector;
import org.embulk.output.mysql.MySQLBatchInsert;
+import org.embulk.spi.Schema;
public class MySQLOutputPlugin
extends AbstractJdbcOutputPlugin
{
public interface MySQLPluginTask
@@ -32,10 +38,14 @@
@ConfigDefault("\"\"")
public String getPassword();
@Config("database")
public String getDatabase();
+
+ @Config("temp_database")
+ @ConfigDefault("null")
+ public Optional<String> getTempDatabase();
}
@Override
protected Class<? extends PluginTask> getTaskClass()
{
@@ -102,10 +112,19 @@
return new MySQLOutputConnector(url, props);
}
@Override
+ protected TableIdentifier buildIntermediateTableId(JdbcOutputConnection con, PluginTask task, String tableName) {
+ MySQLPluginTask t = (MySQLPluginTask) task;
+ if (t.getTempDatabase().isPresent()) {
+ return new TableIdentifier(t.getTempDatabase().get(), null, tableName);
+ }
+ return super.buildIntermediateTableId(con, task, tableName);
+ }
+
+ @Override
protected BatchInsert newBatchInsert(PluginTask task, Optional<MergeConfig> mergeConfig) throws IOException, SQLException
{
return new MySQLBatchInsert(getConnector(task, true), mergeConfig);
}
@@ -119,7 +138,16 @@
case 1205: // ER_LOCK_WAIT_TIMEOUT (Message: Lock wait timeout exceeded; try restarting transaction)
return true;
default:
return false;
}
+ }
+
+ @Override
+ protected void doBegin(JdbcOutputConnection con,
+ PluginTask task, final Schema schema, int taskCount) throws SQLException
+ {
+ MySQLOutputConnection mySQLCon = (MySQLOutputConnection)con;
+ mySQLCon.compareTimeZone();
+ super.doBegin(con,task,schema,taskCount);
}
}