src/main/java/org/embulk/output/ParquetOutputPlugin.java in embulk-output-parquet-0.3.0 vs src/main/java/org/embulk/output/ParquetOutputPlugin.java in embulk-output-parquet-0.4.0
- old
+ new
@@ -3,10 +3,13 @@
import com.google.common.base.Throwables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigSource;
import org.embulk.config.Task;
@@ -18,25 +21,20 @@
import org.embulk.spi.PageReader;
import org.embulk.spi.Schema;
import org.embulk.spi.TransactionalPageOutput;
import org.embulk.spi.time.TimestampFormatter;
import org.embulk.spi.util.Timestamps;
-import parquet.hadoop.ParquetWriter;
-import parquet.hadoop.api.WriteSupport;
-import parquet.hadoop.metadata.CompressionCodecName;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@SuppressWarnings("unused")
public class ParquetOutputPlugin
- implements OutputPlugin
-{
+ implements OutputPlugin {
public interface PluginTask
- extends Task, TimestampFormatter.Task
- {
+ extends Task, TimestampFormatter.Task {
@Config("path_prefix")
String getPathPrefix();
@Config("file_ext")
@ConfigDefault("\".parquet\"")
@@ -45,102 +43,124 @@
@Config("sequence_format")
@ConfigDefault("\".%03d\"")
String getSequenceFormat();
@Config("block_size")
- @ConfigDefault("134217728") // 128M
+ @ConfigDefault("134217728")
+ // 128M
int getBlockSize();
@Config("page_size")
- @ConfigDefault("1048576") // 1M
+ @ConfigDefault("1048576")
+ // 1M
int getPageSize();
@Config("compression_codec")
@ConfigDefault("\"UNCOMPRESSED\"")
String getCompressionCodec();
@Config("column_options")
@ConfigDefault("{}")
Map<String, TimestampColumnOption> getColumnOptions();
+
+ @Config("extra_configurations")
+ @ConfigDefault("{}")
+ Map<String, String> getExtraConfigurations();
+
+ @Config("overwrite")
+ @ConfigDefault("false")
+ boolean getOverwrite();
}
public interface TimestampColumnOption
- extends Task, TimestampFormatter.TimestampColumnOption
- { }
+ extends Task, TimestampFormatter.TimestampColumnOption {
+ }
public ConfigDiff transaction(ConfigSource config,
- Schema schema, int processorCount,
- OutputPlugin.Control control)
- {
+ Schema schema, int processorCount,
+ OutputPlugin.Control control) {
PluginTask task = config.loadConfig(PluginTask.class);
//TODO
control.run(task.dump());
return Exec.newConfigDiff();
}
public ConfigDiff resume(TaskSource taskSource,
- Schema schema, int processorCount,
- OutputPlugin.Control control)
- {
+ Schema schema, int processorCount,
+ OutputPlugin.Control control) {
throw new UnsupportedOperationException("parquet output plugin does not support resuming");
}
public void cleanup(TaskSource taskSource,
- Schema schema, int processorCount,
- List<TaskReport> successTaskReports)
- {
+ Schema schema, int processorCount,
+ List<TaskReport> successTaskReports) {
//TODO
}
- public TransactionalPageOutput open(TaskSource taskSource, final Schema schema, int processorIndex)
- {
+ public TransactionalPageOutput open(TaskSource taskSource, final Schema schema, int processorIndex) {
PluginTask task = taskSource.loadTask(PluginTask.class);
+ final PageReader reader = new PageReader(schema);
+ final ParquetWriter<PageReader> writer = createWriter(task, schema, processorIndex);
+
+ return new ParquetTransactionalPageOutput(reader, writer);
+ }
+
+ private String buildPath(PluginTask task, int processorIndex) {
final String pathPrefix = task.getPathPrefix();
final String pathSuffix = task.getFileNameExtension();
final String sequenceFormat = task.getSequenceFormat();
+ return pathPrefix + String.format(sequenceFormat, processorIndex) + pathSuffix;
+ }
+
+ private ParquetWriter<PageReader> createWriter(PluginTask task, Schema schema, int processorIndex) {
+ final TimestampFormatter[] timestampFormatters = Timestamps.newTimestampColumnFormatters(task, schema, task.getColumnOptions());
+
+ final Path path = new Path(buildPath(task, processorIndex));
final CompressionCodecName codec = CompressionCodecName.valueOf(task.getCompressionCodec());
final int blockSize = task.getBlockSize();
final int pageSize = task.getPageSize();
+ final Configuration conf = createConfiguration(task.getExtraConfigurations());
+ final boolean overwrite = task.getOverwrite();
- final String path = pathPrefix + String.format(sequenceFormat, processorIndex) + pathSuffix;
+ ParquetWriter<PageReader> writer = null;
+ try {
+ EmbulkWriterBuilder builder = new EmbulkWriterBuilder(path, schema, timestampFormatters)
+ .withCompressionCodec(codec)
+ .withRowGroupSize(blockSize)
+ .withPageSize(pageSize)
+ .withDictionaryPageSize(pageSize)
+ .withConf(conf);
- final PageReader reader = new PageReader(schema);
+ if (overwrite) {
+ builder.withWriteMode(ParquetFileWriter.Mode.OVERWRITE);
+ }
- final TimestampFormatter[] timestampFormatters = Timestamps.newTimestampColumnFormatters(task, schema, task.getColumnOptions());
- final EmbulkWriteSupport writeSupport = new EmbulkWriteSupport(schema, timestampFormatters);
- ParquetWriter<PageReader> writer = createParquetWriter(new Path(path), writeSupport, codec, blockSize, pageSize);
-
- return new ParquetTransactionalPageOutput(reader, writer);
+ writer = builder.build();
+ } catch (IOException e) {
+ Throwables.propagate(e);
+ }
+ return writer;
}
- private <T> ParquetWriter<T> createParquetWriter(Path path, WriteSupport<T> writeSupport, CompressionCodecName codec, int blockSize, int pageSize) {
- ParquetWriter<T> writer = null;
-
+ private Configuration createConfiguration(Map<String, String> extra) {
Configuration conf = new Configuration();
+
+ // Default values
conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
conf.set("fs.file.impl", LocalFileSystem.class.getName());
- conf.setClassLoader(this.getClass().getClassLoader());
- try {
- writer = new ParquetWriter<>(
- path,
- writeSupport,
- codec,
- blockSize,
- pageSize,
- pageSize,
- ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED,
- ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED,
- ParquetWriter.DEFAULT_WRITER_VERSION,
- conf);
- } catch (IOException e) {
- Throwables.propagate(e);
+ // Optional values
+ for (Map.Entry<String, String> entry : extra.entrySet()) {
+ conf.set(entry.getKey(), entry.getValue());
}
- return writer;
+
+ conf.setClassLoader(this.getClass().getClassLoader());
+
+ return conf;
}
class ParquetTransactionalPageOutput implements TransactionalPageOutput {
private PageReader reader;
private ParquetWriter<PageReader> writer;
@@ -155,10 +175,10 @@
try {
reader.setPage(page);
while (reader.nextRecord()) {
writer.write(reader);
}
- } catch(IOException e) {
+ } catch (IOException e) {
Throwables.propagate(e);
}
}
@Override