src/main/java/org/embulk/output/EmbulkWriteSupport.java in embulk-output-parquet-0.4.0 vs src/main/java/org/embulk/output/EmbulkWriteSupport.java in embulk-output-parquet-0.5.0
- old
+ new
@@ -18,36 +18,42 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class EmbulkWriteSupport extends WriteSupport<PageReader> {
+public class EmbulkWriteSupport
+ extends WriteSupport<PageReader>
+{
final Schema schema;
RecordConsumer consumer;
WriteContext writeContext;
TimestampFormatter[] timestampFormatters;
- public EmbulkWriteSupport(Schema schema, TimestampFormatter[] timestampFormatters) {
+ public EmbulkWriteSupport(Schema schema, TimestampFormatter[] timestampFormatters)
+ {
this.schema = schema;
this.timestampFormatters = timestampFormatters;
}
@Override
- public WriteContext init(Configuration configuration) {
+ public WriteContext init(Configuration configuration)
+ {
if (writeContext == null) {
init();
}
return writeContext;
}
@Override
- public void prepareForWrite(RecordConsumer recordConsumer) {
+ public void prepareForWrite(RecordConsumer recordConsumer)
+ {
this.consumer = recordConsumer;
}
@Override
- public void write(PageReader record) {
+ public void write(PageReader record)
+ {
final ColumnVisitor visitor = new ParquetColumnVisitor(record, consumer);
consumer.startMessage();
for (Column c : schema.getColumns()) {
if (!record.isNull(c)) {
consumer.startField(c.getName(), c.getIndex());
@@ -56,99 +62,117 @@
}
}
consumer.endMessage();
}
- private void init() {
+ private void init()
+ {
MessageType messageType = convertSchema(schema);
Map<String, String> metadata = new HashMap<>();
writeContext = new WriteContext(messageType, metadata);
}
- private MessageType convertSchema(Schema schema) {
+ private MessageType convertSchema(Schema schema)
+ {
SchemaConvertColumnVisitor visitor = new SchemaConvertColumnVisitor();
schema.visitColumns(visitor);
String messageName = "embulk";
return new MessageType(messageName, visitor.getConvertedFields());
}
- class ParquetColumnVisitor implements ColumnVisitor {
+ class ParquetColumnVisitor
+ implements ColumnVisitor
+ {
final PageReader record;
final RecordConsumer consumer;
- public ParquetColumnVisitor(PageReader record, RecordConsumer consumer) {
+ public ParquetColumnVisitor(PageReader record, RecordConsumer consumer)
+ {
this.record = record;
this.consumer = consumer;
}
@Override
- public void booleanColumn(Column column) {
+ public void booleanColumn(Column column)
+ {
if (!record.isNull(column)) {
consumer.addBoolean(record.getBoolean(column));
}
}
@Override
- public void longColumn(Column column) {
+ public void longColumn(Column column)
+ {
if (!record.isNull(column)) {
consumer.addLong(record.getLong(column));
}
}
@Override
- public void doubleColumn(Column column) {
+ public void doubleColumn(Column column)
+ {
if (!record.isNull(column)) {
consumer.addDouble(record.getDouble(column));
}
}
@Override
- public void stringColumn(Column column) {
+ public void stringColumn(Column column)
+ {
if (!record.isNull(column)) {
consumer.addBinary(Binary.fromString(record.getString(column)));
}
}
@Override
- public void timestampColumn(Column column) {
+ public void timestampColumn(Column column)
+ {
if (!record.isNull(column)) {
Timestamp t = record.getTimestamp(column);
String formatted = timestampFormatters[column.getIndex()].format(t);
consumer.addBinary(Binary.fromString(formatted));
}
}
}
- class SchemaConvertColumnVisitor implements ColumnVisitor {
+ class SchemaConvertColumnVisitor
+ implements ColumnVisitor
+ {
List<Type> fields = new ArrayList<>();
@Override
- public void booleanColumn(Column column) {
+ public void booleanColumn(Column column)
+ {
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.BOOLEAN, column.getName()));
}
@Override
- public void longColumn(Column column) {
+ public void longColumn(Column column)
+ {
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT64, column.getName()));
}
@Override
- public void doubleColumn(Column column) {
+ public void doubleColumn(Column column)
+ {
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.DOUBLE, column.getName()));
}
@Override
- public void stringColumn(Column column) {
+ public void stringColumn(Column column)
+ {
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.BINARY, column.getName()));
}
@Override
- public void timestampColumn(Column column) {
+ public void timestampColumn(Column column)
+ {
// formatted as string
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.BINARY, column.getName()));
}
- public List<Type> getConvertedFields() {
+ public List<Type> getConvertedFields()
+ {
return fields;
}
}
}