src/main/java/org/embulk/output/td/writer/FieldWriterSet.java in embulk-output-td-0.1.6 vs src/main/java/org/embulk/output/td/writer/FieldWriterSet.java in embulk-output-td-0.1.7
- old
+ new
@@ -1,20 +1,28 @@
package org.embulk.output.td.writer;
+import java.io.IOException;
+
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
import org.embulk.config.ConfigException;
import org.embulk.output.td.TdOutputPlugin;
import org.embulk.spi.Column;
+import org.embulk.spi.ColumnVisitor;
+import org.embulk.spi.PageReader;
import org.embulk.spi.Schema;
import org.embulk.spi.time.TimestampFormatter;
import org.embulk.spi.type.BooleanType;
import org.embulk.spi.type.DoubleType;
import org.embulk.spi.type.LongType;
import org.embulk.spi.type.StringType;
import org.embulk.spi.type.TimestampType;
import org.embulk.spi.type.Type;
import org.embulk.spi.util.Timestamps;
+import org.embulk.output.td.MsgpackGZFileBuilder;
+import org.embulk.output.td.TdOutputPlugin.TimeValueConfig;
import org.slf4j.Logger;
public class FieldWriterSet
{
private enum ColumnWriterMode
@@ -24,15 +32,21 @@
DUPLICATE_PRIMARY_KEY;
}
private final int fieldCount;
private final IFieldWriter[] fieldWriters;
+ private final Optional<TimeValueGenerator> staticTimeValue;
public FieldWriterSet(Logger log, TdOutputPlugin.PluginTask task, Schema schema)
{
Optional<String> userDefinedPrimaryKeySourceColumnName = task.getTimeColumn();
TdOutputPlugin.ConvertTimestampType convertTimestamp = task.getConvertTimestampType();
+ Optional<TimeValueConfig> timeValueConfig = task.getTimeValue();
+ if (timeValueConfig.isPresent() && userDefinedPrimaryKeySourceColumnName.isPresent()) {
+ throw new ConfigException("Setting both time_column and time_value is invalid");
+ }
+
boolean hasPkWriter = false;
int duplicatePrimaryKeySourceIndex = -1;
int firstTimestampColumnIndex = -1;
int fc = 0;
@@ -60,12 +74,18 @@
// the column name is same with the primary key name.
if (userDefinedPrimaryKeySourceColumnName.isPresent()) {
columnName = newColumnUniqueName(columnName, schema);
mode = ColumnWriterMode.SIMPLE_VALUE;
log.warn("time_column '{}' is set but 'time' column also exists. The existent 'time' column is renamed to {}",
- userDefinedPrimaryKeySourceColumnName.get(), "time", "time", columnName);
+ userDefinedPrimaryKeySourceColumnName.get(), columnName);
}
+ else if (timeValueConfig.isPresent()) {
+ columnName = newColumnUniqueName(columnName, schema);
+ mode = ColumnWriterMode.SIMPLE_VALUE;
+ log.warn("time_value is set but 'time' column also exists. The existent 'time' column is renamed to {}",
+ columnName);
+ }
else {
mode = ColumnWriterMode.PRIMARY_KEY;
}
}
else {
@@ -141,11 +161,15 @@
fieldWriters[i] = writer;
fc += 1;
}
- if (!hasPkWriter) {
+ if (timeValueConfig.isPresent()) {
+ // "time" column is written by RecordWriter
+ fc += 1;
+ }
+ else if (!hasPkWriter) {
// PRIMARY_KEY was not found.
if (duplicatePrimaryKeySourceIndex < 0) {
if (userDefinedPrimaryKeySourceColumnName.isPresent()) {
throw new ConfigException(String.format("time_column '%s' does not exist", userDefinedPrimaryKeySourceColumnName.get()));
}
@@ -193,10 +217,17 @@
// replace existint writer
fieldWriters[duplicatePrimaryKeySourceIndex] = writer;
fc += 1;
}
+ if (timeValueConfig.isPresent()) {
+ staticTimeValue = Optional.of(new TimeValueGenerator(timeValueConfig.get()));
+ }
+ else {
+ staticTimeValue = Optional.absent();
+ }
+
fieldCount = fc;
}
private static String newColumnUniqueName(String originalName, Schema schema)
{
@@ -216,15 +247,103 @@
}
}
return false;
}
+ @VisibleForTesting
public IFieldWriter getFieldWriter(int index)
{
return fieldWriters[index];
}
- public int getFieldCount()
+ public void addRecord(final MsgpackGZFileBuilder builder, final PageReader reader)
+ throws IOException
{
- return fieldCount;
+ beginRecord(builder);
+
+ reader.getSchema().visitColumns(new ColumnVisitor() {
+ @Override
+ public void booleanColumn(Column column)
+ {
+ addColumn(builder, reader, column);
+ }
+
+ @Override
+ public void longColumn(Column column)
+ {
+ addColumn(builder, reader, column);
+ }
+
+ @Override
+ public void doubleColumn(Column column)
+ {
+ addColumn(builder, reader, column);
+ }
+
+ @Override
+ public void stringColumn(Column column)
+ {
+ addColumn(builder, reader, column);
+ }
+
+ @Override
+ public void timestampColumn(Column column)
+ {
+ addColumn(builder, reader, column);
+ }
+
+ });
+
+ endRecord(builder);
+ }
+
+ private void beginRecord(MsgpackGZFileBuilder builder)
+ throws IOException
+ {
+ builder.writeMapBegin(fieldCount);
+ if (staticTimeValue.isPresent()) {
+ builder.writeString("time");
+ builder.writeLong(staticTimeValue.get().next());
+ }
+ }
+
+ private void endRecord(MsgpackGZFileBuilder builder)
+ throws IOException
+ {
+ builder.writeMapEnd();
+ }
+
+ private void addColumn(MsgpackGZFileBuilder builder, PageReader reader, Column column)
+ {
+ try {
+ fieldWriters[column.getIndex()].writeKeyValue(builder, reader, column);
+ }
+ catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ static class TimeValueGenerator
+ {
+ private final long from;
+ private final long to;
+ private long current;
+
+ TimeValueGenerator(TimeValueConfig config)
+ {
+ current = from = config.getFrom();
+ to = config.getTo();
+ }
+
+ long next()
+ {
+ try {
+ return current++;
+ }
+ finally {
+ if (current >= to) {
+ current = from;
+ }
+ }
+ }
}
}