src/main/java/org/embulk/output/MySQLOutputPlugin.java in embulk-output-mysql-0.7.8 vs src/main/java/org/embulk/output/MySQLOutputPlugin.java in embulk-output-mysql-0.7.9

- old
+ new

@@ -1,18 +1,24 @@ package org.embulk.output; import java.util.Properties; import java.io.IOException; import java.sql.SQLException; + import com.google.common.base.Optional; + import org.embulk.config.Config; import org.embulk.config.ConfigDefault; import org.embulk.output.jdbc.AbstractJdbcOutputPlugin; import org.embulk.output.jdbc.BatchInsert; +import org.embulk.output.jdbc.JdbcOutputConnection; import org.embulk.output.jdbc.MergeConfig; +import org.embulk.output.jdbc.TableIdentifier; +import org.embulk.output.mysql.MySQLOutputConnection; import org.embulk.output.mysql.MySQLOutputConnector; import org.embulk.output.mysql.MySQLBatchInsert; +import org.embulk.spi.Schema; public class MySQLOutputPlugin extends AbstractJdbcOutputPlugin { public interface MySQLPluginTask @@ -32,10 +38,14 @@ @ConfigDefault("\"\"") public String getPassword(); @Config("database") public String getDatabase(); + + @Config("temp_database") + @ConfigDefault("null") + public Optional<String> getTempDatabase(); } @Override protected Class<? extends PluginTask> getTaskClass() { @@ -102,10 +112,19 @@ return new MySQLOutputConnector(url, props); } @Override + protected TableIdentifier buildIntermediateTableId(JdbcOutputConnection con, PluginTask task, String tableName) { + MySQLPluginTask t = (MySQLPluginTask) task; + if (t.getTempDatabase().isPresent()) { + return new TableIdentifier(t.getTempDatabase().get(), null, tableName); + } + return super.buildIntermediateTableId(con, task, tableName); + } + + @Override protected BatchInsert newBatchInsert(PluginTask task, Optional<MergeConfig> mergeConfig) throws IOException, SQLException { return new MySQLBatchInsert(getConnector(task, true), mergeConfig); } @@ -119,7 +138,16 @@ case 1205: // ER_LOCK_WAIT_TIMEOUT (Message: Lock wait timeout exceeded; try restarting transaction) return true; default: return false; } + } + + @Override + protected void doBegin(JdbcOutputConnection con, + PluginTask task, final Schema schema, int taskCount) throws SQLException + { + MySQLOutputConnection mySQLCon = (MySQLOutputConnection)con; + mySQLCon.compareTimeZone(); + super.doBegin(con,task,schema,taskCount); } }