src/main/java/org/embulk/output/EmbulkWriteSupport.java in embulk-output-parquet-0.5.0 vs src/main/java/org/embulk/output/EmbulkWriteSupport.java in embulk-output-parquet-0.6.0
- old
+ new
@@ -3,10 +3,11 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Type;
import org.embulk.spi.Column;
import org.embulk.spi.ColumnVisitor;
@@ -25,15 +26,17 @@
{
final Schema schema;
RecordConsumer consumer;
WriteContext writeContext;
TimestampFormatter[] timestampFormatters;
+ boolean addUTF8;
- public EmbulkWriteSupport(Schema schema, TimestampFormatter[] timestampFormatters)
+ public EmbulkWriteSupport(Schema schema, TimestampFormatter[] timestampFormatters, boolean addUTF8)
{
this.schema = schema;
this.timestampFormatters = timestampFormatters;
+ this.addUTF8 = addUTF8;
}
@Override
public WriteContext init(Configuration configuration)
{
@@ -71,11 +74,16 @@
writeContext = new WriteContext(messageType, metadata);
}
private MessageType convertSchema(Schema schema)
{
- SchemaConvertColumnVisitor visitor = new SchemaConvertColumnVisitor();
+ SchemaConvertColumnVisitor visitor = null;
+ if (addUTF8) {
+ visitor = new SchemaConvertColumnVisitorWithUTF8();
+ } else {
+ visitor = new SchemaConvertColumnVisitor();
+ }
schema.visitColumns(visitor);
String messageName = "embulk";
return new MessageType(messageName, visitor.getConvertedFields());
}
@@ -122,10 +130,16 @@
consumer.addBinary(Binary.fromString(record.getString(column)));
}
}
@Override
+ public void jsonColumn(Column column)
+ {
+ throw new UnsupportedOperationException("This plugin doesn't support json type. Please try to upgrade version of the plugin using 'embulk gem update' command. If the latest version still doesn't support json type, please contact plugin developers, or change configuration of input plugin not to use json type.");
+ }
+
+ @Override
public void timestampColumn(Column column)
{
if (!record.isNull(column)) {
Timestamp t = record.getTimestamp(column);
String formatted = timestampFormatters[column.getIndex()].format(t);
@@ -162,10 +176,16 @@
{
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.BINARY, column.getName()));
}
@Override
+ public void jsonColumn(Column column)
+ {
+ throw new UnsupportedOperationException("This plugin doesn't support json type. Please try to upgrade version of the plugin using 'embulk gem update' command. If the latest version still doesn't support json type, please contact plugin developers, or change configuration of input plugin not to use json type.");
+ }
+
+ @Override
public void timestampColumn(Column column)
{
// formatted as string
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.BINARY, column.getName()));
}
@@ -173,6 +193,23 @@
public List<Type> getConvertedFields()
{
return fields;
}
}
+
+ class SchemaConvertColumnVisitorWithUTF8 extends SchemaConvertColumnVisitor
+ {
+ @Override
+ public void stringColumn(Column column)
+ {
+ fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.BINARY, column.getName(), OriginalType.UTF8));
+ }
+
+ @Override
+ public void timestampColumn(Column column)
+ {
+ // formatted as string
+ fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.BINARY, column.getName(), OriginalType.UTF8));
+ }
+ }
+
}