src/main/java/org/embulk/output/jdbc/StandardBatchInsert.java in embulk-output-jdbc-0.4.0 vs src/main/java/org/embulk/output/jdbc/StandardBatchInsert.java in embulk-output-jdbc-0.4.1

- old
+ new

@@ -1,200 +1,200 @@ -package org.embulk.output.jdbc; - -import java.util.List; -import java.util.Calendar; -import java.io.IOException; -import java.math.BigDecimal; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.sql.Date; -import java.sql.Time; -import com.google.common.base.Optional; -import org.slf4j.Logger; -import org.embulk.spi.time.Timestamp; -import org.embulk.spi.Exec; - -public class StandardBatchInsert - implements BatchInsert -{ - private final Logger logger = Exec.getLogger(StandardBatchInsert.class); - - private final JdbcOutputConnector connector; - private final Optional<List<String>> mergeKeys; - - private JdbcOutputConnection connection; - private PreparedStatement batch; - private int index; - private int batchWeight; - private int batchRows; - private long totalRows; - - public StandardBatchInsert(JdbcOutputConnector connector, Optional<List<String>> mergeKeys) throws IOException, SQLException - { - this.connector = connector; - this.mergeKeys = mergeKeys; - } - - public void prepare(String loadTable, JdbcSchema insertSchema) throws SQLException - { - this.connection = connector.connect(true); - this.index = 1; // PreparedStatement index begings from 1 - this.batchRows = 0; - this.totalRows = 0; - this.batch = prepareStatement(loadTable, insertSchema); - batch.clearBatch(); - } - - protected PreparedStatement prepareStatement(String loadTable, JdbcSchema insertSchema) throws SQLException - { - return connection.prepareBatchInsertStatement(loadTable, insertSchema, mergeKeys); - } - - public int getBatchWeight() - { - return batchWeight; - } - - public void add() throws IOException, SQLException - { - batch.addBatch(); - index = 1; // PreparedStatement index begins from 1 - batchRows++; - batchWeight += 32; // add weight as overhead of each rows - } - - public void close() throws IOException, SQLException - { - // caller should close the connection - } - - public void flush() throws IOException, SQLException - { - logger.info(String.format("Loading %,d rows", batchRows)); - long startTime = System.currentTimeMillis(); - batch.executeBatch(); // here can't use returned value because MySQL Connector/J returns SUCCESS_NO_INFO as a batch result - double seconds = (System.currentTimeMillis() - startTime) / 1000.0; - - totalRows += batchRows; - logger.info(String.format("> %.2f seconds (loaded %,d rows in total)", seconds, totalRows)); - batch.clearBatch(); - batchRows = 0; - batchWeight = 0; - } - - public void finish() throws IOException, SQLException - { - if (getBatchWeight() != 0) { - flush(); - } - } - - public void setNull(int sqlType) throws IOException, SQLException - { - batch.setNull(index, sqlType); - nextColumn(0); - } - - public void setBoolean(boolean v) throws IOException, SQLException - { - batch.setBoolean(index, v); - nextColumn(1); - } - - public void setByte(byte v) throws IOException, SQLException - { - batch.setByte(index, v); - nextColumn(1); - } - - public void setShort(short v) throws IOException, SQLException - { - batch.setShort(index, v); - nextColumn(2); - } - - public void setInt(int v) throws IOException, SQLException - { - batch.setInt(index, v); - nextColumn(4); - } - - public void setLong(long v) throws IOException, SQLException - { - batch.setLong(index, v); - nextColumn(8); - } - - public void setFloat(float v) throws IOException, SQLException - { - batch.setFloat(index, v); - nextColumn(4); - } - - public void setDouble(double v) throws IOException, SQLException - { - batch.setDouble(index, v); - nextColumn(8); - } - - public void setBigDecimal(BigDecimal v) throws IOException, SQLException - { - // use estimated number of necessary bytes + 8 byte for the weight - // assuming one place needs 4 bits. ceil(v.precision() / 2.0) + 8 - batch.setBigDecimal(index, v); - nextColumn((v.precision() & ~2) / 2 + 8); - } - - public void setString(String v) throws IOException, SQLException - { - batch.setString(index, v); - // estimate all chracters use 2 bytes; almost enough for the worst case - nextColumn(v.length() * 2 + 4); - } - - public void setNString(String v) throws IOException, SQLException - { - batch.setNString(index, v); - // estimate all chracters use 2 bytes; almost enough for the worst case - nextColumn(v.length() * 2 + 4); - } - - public void setBytes(byte[] v) throws IOException, SQLException - { - batch.setBytes(index, v); - nextColumn(v.length + 4); - } - - public void setSqlDate(Timestamp v, Calendar cal) throws IOException, SQLException - { - // JavaDoc of java.sql.Time says: - // >> To conform with the definition of SQL DATE, the millisecond values wrapped by a java.sql.Date instance must be 'normalized' by setting the hours, minutes, seconds, and milliseconds to zero in the particular time zone with which the instance is associated. - cal.setTimeInMillis(v.getEpochSecond() * 1000); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.HOUR_OF_DAY, 0); - Date normalized = new Date(cal.getTimeInMillis()); - batch.setDate(index, normalized, cal); - nextColumn(32); - } - - public void setSqlTime(Timestamp v, Calendar cal) throws IOException, SQLException - { - Time t = new Time(v.toEpochMilli()); - batch.setTime(index, t, cal); - nextColumn(32); - } - - public void setSqlTimestamp(Timestamp v, Calendar cal) throws IOException, SQLException - { - java.sql.Timestamp t = new java.sql.Timestamp(v.toEpochMilli()); - t.setNanos(v.getNano()); - batch.setTimestamp(index, t, cal); - nextColumn(32); - } - - private void nextColumn(int weight) - { - index++; - batchWeight += weight + 4; // add weight as overhead of each columns - } -} +package org.embulk.output.jdbc; + +import java.util.List; +import java.util.Calendar; +import java.io.IOException; +import java.math.BigDecimal; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Date; +import java.sql.Time; +import com.google.common.base.Optional; +import org.slf4j.Logger; +import org.embulk.spi.time.Timestamp; +import org.embulk.spi.Exec; + +public class StandardBatchInsert + implements BatchInsert +{ + private final Logger logger = Exec.getLogger(StandardBatchInsert.class); + + private final JdbcOutputConnector connector; + private final Optional<List<String>> mergeKeys; + + private JdbcOutputConnection connection; + private PreparedStatement batch; + private int index; + private int batchWeight; + private int batchRows; + private long totalRows; + + public StandardBatchInsert(JdbcOutputConnector connector, Optional<List<String>> mergeKeys) throws IOException, SQLException + { + this.connector = connector; + this.mergeKeys = mergeKeys; + } + + public void prepare(String loadTable, JdbcSchema insertSchema) throws SQLException + { + this.connection = connector.connect(true); + this.index = 1; // PreparedStatement index begings from 1 + this.batchRows = 0; + this.totalRows = 0; + this.batch = prepareStatement(loadTable, insertSchema); + batch.clearBatch(); + } + + protected PreparedStatement prepareStatement(String loadTable, JdbcSchema insertSchema) throws SQLException + { + return connection.prepareBatchInsertStatement(loadTable, insertSchema, mergeKeys); + } + + public int getBatchWeight() + { + return batchWeight; + } + + public void add() throws IOException, SQLException + { + batch.addBatch(); + index = 1; // PreparedStatement index begins from 1 + batchRows++; + batchWeight += 32; // add weight as overhead of each rows + } + + public void close() throws IOException, SQLException + { + // caller should close the connection + } + + public void flush() throws IOException, SQLException + { + logger.info(String.format("Loading %,d rows", batchRows)); + long startTime = System.currentTimeMillis(); + batch.executeBatch(); // here can't use returned value because MySQL Connector/J returns SUCCESS_NO_INFO as a batch result + double seconds = (System.currentTimeMillis() - startTime) / 1000.0; + + totalRows += batchRows; + logger.info(String.format("> %.2f seconds (loaded %,d rows in total)", seconds, totalRows)); + batch.clearBatch(); + batchRows = 0; + batchWeight = 0; + } + + public void finish() throws IOException, SQLException + { + if (getBatchWeight() != 0) { + flush(); + } + } + + public void setNull(int sqlType) throws IOException, SQLException + { + batch.setNull(index, sqlType); + nextColumn(0); + } + + public void setBoolean(boolean v) throws IOException, SQLException + { + batch.setBoolean(index, v); + nextColumn(1); + } + + public void setByte(byte v) throws IOException, SQLException + { + batch.setByte(index, v); + nextColumn(1); + } + + public void setShort(short v) throws IOException, SQLException + { + batch.setShort(index, v); + nextColumn(2); + } + + public void setInt(int v) throws IOException, SQLException + { + batch.setInt(index, v); + nextColumn(4); + } + + public void setLong(long v) throws IOException, SQLException + { + batch.setLong(index, v); + nextColumn(8); + } + + public void setFloat(float v) throws IOException, SQLException + { + batch.setFloat(index, v); + nextColumn(4); + } + + public void setDouble(double v) throws IOException, SQLException + { + batch.setDouble(index, v); + nextColumn(8); + } + + public void setBigDecimal(BigDecimal v) throws IOException, SQLException + { + // use estimated number of necessary bytes + 8 byte for the weight + // assuming one place needs 4 bits. ceil(v.precision() / 2.0) + 8 + batch.setBigDecimal(index, v); + nextColumn((v.precision() & ~2) / 2 + 8); + } + + public void setString(String v) throws IOException, SQLException + { + batch.setString(index, v); + // estimate all chracters use 2 bytes; almost enough for the worst case + nextColumn(v.length() * 2 + 4); + } + + public void setNString(String v) throws IOException, SQLException + { + batch.setNString(index, v); + // estimate all chracters use 2 bytes; almost enough for the worst case + nextColumn(v.length() * 2 + 4); + } + + public void setBytes(byte[] v) throws IOException, SQLException + { + batch.setBytes(index, v); + nextColumn(v.length + 4); + } + + public void setSqlDate(Timestamp v, Calendar cal) throws IOException, SQLException + { + // JavaDoc of java.sql.Time says: + // >> To conform with the definition of SQL DATE, the millisecond values wrapped by a java.sql.Date instance must be 'normalized' by setting the hours, minutes, seconds, and milliseconds to zero in the particular time zone with which the instance is associated. + cal.setTimeInMillis(v.getEpochSecond() * 1000); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MINUTE, 0); + cal.set(Calendar.HOUR_OF_DAY, 0); + Date normalized = new Date(cal.getTimeInMillis()); + batch.setDate(index, normalized, cal); + nextColumn(32); + } + + public void setSqlTime(Timestamp v, Calendar cal) throws IOException, SQLException + { + Time t = new Time(v.toEpochMilli()); + batch.setTime(index, t, cal); + nextColumn(32); + } + + public void setSqlTimestamp(Timestamp v, Calendar cal) throws IOException, SQLException + { + java.sql.Timestamp t = new java.sql.Timestamp(v.toEpochMilli()); + t.setNanos(v.getNano()); + batch.setTimestamp(index, t, cal); + nextColumn(32); + } + + private void nextColumn(int weight) + { + index++; + batchWeight += weight + 4; // add weight as overhead of each columns + } +}