package org.embulk.input.mysql.getter; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Optional; import org.embulk.config.ConfigSource; import org.embulk.config.Task; import org.embulk.input.jdbc.getter.AbstractIncrementalHandler; import org.embulk.input.jdbc.getter.ColumnGetter; import org.embulk.spi.Column; import org.embulk.spi.Exec; import org.embulk.spi.time.TimestampFormatter; import org.embulk.spi.time.TimestampParser; import org.joda.time.DateTimeZone; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; public abstract class AbstractMySQLTimestampIncrementalHandler extends AbstractIncrementalHandler { protected final DateTimeZone sessionTimeZone; protected long epochSecond; protected int nano; public AbstractMySQLTimestampIncrementalHandler(DateTimeZone sessionTimeZone, ColumnGetter next) { super(next); this.sessionTimeZone = sessionTimeZone; } @Override public void getAndSet(ResultSet from, int fromIndex, Column toColumn) throws SQLException { Timestamp timestamp = from.getTimestamp(fromIndex); if (timestamp != null) { epochSecond = timestamp.getTime() / 1000; nano = timestamp.getNanos(); } super.getAndSet(from, fromIndex, toColumn); } private static interface FormatterIntlTask extends Task, TimestampFormatter.Task {} private static interface FormatterIntlColumnOption extends Task, TimestampFormatter.TimestampColumnOption {} @Override public JsonNode encodeToJson() { // TODO: Switch to a newer TimestampFormatter constructor after a reasonable interval. // Traditional constructor is used here for compatibility. final ConfigSource configSource = Exec.newConfigSource(); configSource.set("format", getTimestampFormat()); configSource.set("timezone", "UTC"); TimestampFormatter formatter = new TimestampFormatter( Exec.newConfigSource().loadConfig(FormatterIntlTask.class), Optional.fromNullable(configSource.loadConfig(FormatterIntlColumnOption.class))); String text = formatter.format(utcTimestampFromSessionTime(epochSecond, nano)); return jsonNodeFactory.textNode(text); } protected abstract String getTimestampFormat(); protected abstract org.embulk.spi.time.Timestamp utcTimestampFromSessionTime(long epochSecond, int nano); private static interface ParserIntlTask extends Task, TimestampParser.Task {} private static interface ParserIntlColumnOption extends Task, TimestampParser.TimestampColumnOption {} @Override public void decodeFromJsonTo(PreparedStatement toStatement, int toIndex, JsonNode fromValue) throws SQLException { // TODO: Switch to a newer TimestampParser constructor after a reasonable interval. // Traditional constructor is used here for compatibility. final ConfigSource configSource = Exec.newConfigSource(); configSource.set("format", getTimestampPattern()); configSource.set("timezone", "UTC"); TimestampParser parser = new TimestampParser( Exec.newConfigSource().loadConfig(ParserIntlTask.class), configSource.loadConfig(ParserIntlColumnOption.class)); org.embulk.spi.time.Timestamp epoch = parser.parse(fromValue.asText()); toStatement.setTimestamp(toIndex, utcTimestampToSessionTime(epoch)); } protected abstract String getTimestampPattern(); protected abstract Timestamp utcTimestampToSessionTime(org.embulk.spi.time.Timestamp ts); }