src/main/java/org/embulk/output/ParquetOutputPlugin.java in embulk-output-parquet-0.3.0 vs src/main/java/org/embulk/output/ParquetOutputPlugin.java in embulk-output-parquet-0.4.0

- old
+ new

@@ -3,10 +3,13 @@ import com.google.common.base.Throwables; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.embulk.config.Config; import org.embulk.config.ConfigDefault; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigSource; import org.embulk.config.Task; @@ -18,25 +21,20 @@ import org.embulk.spi.PageReader; import org.embulk.spi.Schema; import org.embulk.spi.TransactionalPageOutput; import org.embulk.spi.time.TimestampFormatter; import org.embulk.spi.util.Timestamps; -import parquet.hadoop.ParquetWriter; -import parquet.hadoop.api.WriteSupport; -import parquet.hadoop.metadata.CompressionCodecName; import java.io.IOException; import java.util.List; import java.util.Map; @SuppressWarnings("unused") public class ParquetOutputPlugin - implements OutputPlugin -{ + implements OutputPlugin { public interface PluginTask - extends Task, TimestampFormatter.Task - { + extends Task, TimestampFormatter.Task { @Config("path_prefix") String getPathPrefix(); @Config("file_ext") @ConfigDefault("\".parquet\"") @@ -45,102 +43,124 @@ @Config("sequence_format") @ConfigDefault("\".%03d\"") String getSequenceFormat(); @Config("block_size") - @ConfigDefault("134217728") // 128M + @ConfigDefault("134217728") + // 128M int getBlockSize(); @Config("page_size") - @ConfigDefault("1048576") // 1M + @ConfigDefault("1048576") + // 1M int getPageSize(); @Config("compression_codec") @ConfigDefault("\"UNCOMPRESSED\"") String getCompressionCodec(); @Config("column_options") @ConfigDefault("{}") Map<String, TimestampColumnOption> getColumnOptions(); + + @Config("extra_configurations") + @ConfigDefault("{}") + Map<String, String> getExtraConfigurations(); + + @Config("overwrite") + @ConfigDefault("false") + boolean getOverwrite(); } public interface TimestampColumnOption - extends Task, TimestampFormatter.TimestampColumnOption - { } + extends Task, TimestampFormatter.TimestampColumnOption { + } public ConfigDiff transaction(ConfigSource config, - Schema schema, int processorCount, - OutputPlugin.Control control) - { + Schema schema, int processorCount, + OutputPlugin.Control control) { PluginTask task = config.loadConfig(PluginTask.class); //TODO control.run(task.dump()); return Exec.newConfigDiff(); } public ConfigDiff resume(TaskSource taskSource, - Schema schema, int processorCount, - OutputPlugin.Control control) - { + Schema schema, int processorCount, + OutputPlugin.Control control) { throw new UnsupportedOperationException("parquet output plugin does not support resuming"); } public void cleanup(TaskSource taskSource, - Schema schema, int processorCount, - List<TaskReport> successTaskReports) - { + Schema schema, int processorCount, + List<TaskReport> successTaskReports) { //TODO } - public TransactionalPageOutput open(TaskSource taskSource, final Schema schema, int processorIndex) - { + public TransactionalPageOutput open(TaskSource taskSource, final Schema schema, int processorIndex) { PluginTask task = taskSource.loadTask(PluginTask.class); + final PageReader reader = new PageReader(schema); + final ParquetWriter<PageReader> writer = createWriter(task, schema, processorIndex); + + return new ParquetTransactionalPageOutput(reader, writer); + } + + private String buildPath(PluginTask task, int processorIndex) { final String pathPrefix = task.getPathPrefix(); final String pathSuffix = task.getFileNameExtension(); final String sequenceFormat = task.getSequenceFormat(); + return pathPrefix + String.format(sequenceFormat, processorIndex) + pathSuffix; + } + + private ParquetWriter<PageReader> createWriter(PluginTask task, Schema schema, int processorIndex) { + final TimestampFormatter[] timestampFormatters = Timestamps.newTimestampColumnFormatters(task, schema, task.getColumnOptions()); + + final Path path = new Path(buildPath(task, processorIndex)); final CompressionCodecName codec = CompressionCodecName.valueOf(task.getCompressionCodec()); final int blockSize = task.getBlockSize(); final int pageSize = task.getPageSize(); + final Configuration conf = createConfiguration(task.getExtraConfigurations()); + final boolean overwrite = task.getOverwrite(); - final String path = pathPrefix + String.format(sequenceFormat, processorIndex) + pathSuffix; + ParquetWriter<PageReader> writer = null; + try { + EmbulkWriterBuilder builder = new EmbulkWriterBuilder(path, schema, timestampFormatters) + .withCompressionCodec(codec) + .withRowGroupSize(blockSize) + .withPageSize(pageSize) + .withDictionaryPageSize(pageSize) + .withConf(conf); - final PageReader reader = new PageReader(schema); + if (overwrite) { + builder.withWriteMode(ParquetFileWriter.Mode.OVERWRITE); + } - final TimestampFormatter[] timestampFormatters = Timestamps.newTimestampColumnFormatters(task, schema, task.getColumnOptions()); - final EmbulkWriteSupport writeSupport = new EmbulkWriteSupport(schema, timestampFormatters); - ParquetWriter<PageReader> writer = createParquetWriter(new Path(path), writeSupport, codec, blockSize, pageSize); - - return new ParquetTransactionalPageOutput(reader, writer); + writer = builder.build(); + } catch (IOException e) { + Throwables.propagate(e); + } + return writer; } - private <T> ParquetWriter<T> createParquetWriter(Path path, WriteSupport<T> writeSupport, CompressionCodecName codec, int blockSize, int pageSize) { - ParquetWriter<T> writer = null; - + private Configuration createConfiguration(Map<String, String> extra) { Configuration conf = new Configuration(); + + // Default values conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName()); conf.set("fs.file.impl", LocalFileSystem.class.getName()); - conf.setClassLoader(this.getClass().getClassLoader()); - try { - writer = new ParquetWriter<>( - path, - writeSupport, - codec, - blockSize, - pageSize, - pageSize, - ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED, - ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED, - ParquetWriter.DEFAULT_WRITER_VERSION, - conf); - } catch (IOException e) { - Throwables.propagate(e); + // Optional values + for (Map.Entry<String, String> entry : extra.entrySet()) { + conf.set(entry.getKey(), entry.getValue()); } - return writer; + + conf.setClassLoader(this.getClass().getClassLoader()); + + return conf; } class ParquetTransactionalPageOutput implements TransactionalPageOutput { private PageReader reader; private ParquetWriter<PageReader> writer; @@ -155,10 +175,10 @@ try { reader.setPage(page); while (reader.nextRecord()) { writer.write(reader); } - } catch(IOException e) { + } catch (IOException e) { Throwables.propagate(e); } } @Override