src/main/java/org/embulk/output/td/writer/FieldWriterSet.java in embulk-output-td-0.1.6 vs src/main/java/org/embulk/output/td/writer/FieldWriterSet.java in embulk-output-td-0.1.7

- old
+ new

@@ -1,20 +1,28 @@ package org.embulk.output.td.writer; +import java.io.IOException; + +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; +import com.google.common.base.Throwables; import org.embulk.config.ConfigException; import org.embulk.output.td.TdOutputPlugin; import org.embulk.spi.Column; +import org.embulk.spi.ColumnVisitor; +import org.embulk.spi.PageReader; import org.embulk.spi.Schema; import org.embulk.spi.time.TimestampFormatter; import org.embulk.spi.type.BooleanType; import org.embulk.spi.type.DoubleType; import org.embulk.spi.type.LongType; import org.embulk.spi.type.StringType; import org.embulk.spi.type.TimestampType; import org.embulk.spi.type.Type; import org.embulk.spi.util.Timestamps; +import org.embulk.output.td.MsgpackGZFileBuilder; +import org.embulk.output.td.TdOutputPlugin.TimeValueConfig; import org.slf4j.Logger; public class FieldWriterSet { private enum ColumnWriterMode @@ -24,15 +32,21 @@ DUPLICATE_PRIMARY_KEY; } private final int fieldCount; private final IFieldWriter[] fieldWriters; + private final Optional<TimeValueGenerator> staticTimeValue; public FieldWriterSet(Logger log, TdOutputPlugin.PluginTask task, Schema schema) { Optional<String> userDefinedPrimaryKeySourceColumnName = task.getTimeColumn(); TdOutputPlugin.ConvertTimestampType convertTimestamp = task.getConvertTimestampType(); + Optional<TimeValueConfig> timeValueConfig = task.getTimeValue(); + if (timeValueConfig.isPresent() && userDefinedPrimaryKeySourceColumnName.isPresent()) { + throw new ConfigException("Setting both time_column and time_value is invalid"); + } + boolean hasPkWriter = false; int duplicatePrimaryKeySourceIndex = -1; int firstTimestampColumnIndex = -1; int fc = 0; @@ -60,12 +74,18 @@ // the column name is same with the primary key name. if (userDefinedPrimaryKeySourceColumnName.isPresent()) { columnName = newColumnUniqueName(columnName, schema); mode = ColumnWriterMode.SIMPLE_VALUE; log.warn("time_column '{}' is set but 'time' column also exists. The existent 'time' column is renamed to {}", - userDefinedPrimaryKeySourceColumnName.get(), "time", "time", columnName); + userDefinedPrimaryKeySourceColumnName.get(), columnName); } + else if (timeValueConfig.isPresent()) { + columnName = newColumnUniqueName(columnName, schema); + mode = ColumnWriterMode.SIMPLE_VALUE; + log.warn("time_value is set but 'time' column also exists. The existent 'time' column is renamed to {}", + columnName); + } else { mode = ColumnWriterMode.PRIMARY_KEY; } } else { @@ -141,11 +161,15 @@ fieldWriters[i] = writer; fc += 1; } - if (!hasPkWriter) { + if (timeValueConfig.isPresent()) { + // "time" column is written by RecordWriter + fc += 1; + } + else if (!hasPkWriter) { // PRIMARY_KEY was not found. if (duplicatePrimaryKeySourceIndex < 0) { if (userDefinedPrimaryKeySourceColumnName.isPresent()) { throw new ConfigException(String.format("time_column '%s' does not exist", userDefinedPrimaryKeySourceColumnName.get())); } @@ -193,10 +217,17 @@ // replace existint writer fieldWriters[duplicatePrimaryKeySourceIndex] = writer; fc += 1; } + if (timeValueConfig.isPresent()) { + staticTimeValue = Optional.of(new TimeValueGenerator(timeValueConfig.get())); + } + else { + staticTimeValue = Optional.absent(); + } + fieldCount = fc; } private static String newColumnUniqueName(String originalName, Schema schema) { @@ -216,15 +247,103 @@ } } return false; } + @VisibleForTesting public IFieldWriter getFieldWriter(int index) { return fieldWriters[index]; } - public int getFieldCount() + public void addRecord(final MsgpackGZFileBuilder builder, final PageReader reader) + throws IOException { - return fieldCount; + beginRecord(builder); + + reader.getSchema().visitColumns(new ColumnVisitor() { + @Override + public void booleanColumn(Column column) + { + addColumn(builder, reader, column); + } + + @Override + public void longColumn(Column column) + { + addColumn(builder, reader, column); + } + + @Override + public void doubleColumn(Column column) + { + addColumn(builder, reader, column); + } + + @Override + public void stringColumn(Column column) + { + addColumn(builder, reader, column); + } + + @Override + public void timestampColumn(Column column) + { + addColumn(builder, reader, column); + } + + }); + + endRecord(builder); + } + + private void beginRecord(MsgpackGZFileBuilder builder) + throws IOException + { + builder.writeMapBegin(fieldCount); + if (staticTimeValue.isPresent()) { + builder.writeString("time"); + builder.writeLong(staticTimeValue.get().next()); + } + } + + private void endRecord(MsgpackGZFileBuilder builder) + throws IOException + { + builder.writeMapEnd(); + } + + private void addColumn(MsgpackGZFileBuilder builder, PageReader reader, Column column) + { + try { + fieldWriters[column.getIndex()].writeKeyValue(builder, reader, column); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + + static class TimeValueGenerator + { + private final long from; + private final long to; + private long current; + + TimeValueGenerator(TimeValueConfig config) + { + current = from = config.getFrom(); + to = config.getTo(); + } + + long next() + { + try { + return current++; + } + finally { + if (current >= to) { + current = from; + } + } + } } }