Sha256: 91f7fd3511299ff9c9d1aa1c082bdd68cae0a29f18580ffece9b2382132f141b

Contents?: true

Size: 1.51 KB

Versions: 1

Compression:

Stored size: 1.51 KB

Contents

package org.embulk.output.s3_parquet.parquet


import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
import org.apache.parquet.io.api.RecordConsumer
import org.apache.parquet.schema.MessageType
import org.embulk.spi.{PageReader, Schema}
import org.embulk.spi.time.TimestampFormatter

import scala.jdk.CollectionConverters._


private[parquet] case class ParquetFileWriteSupport(schema: Schema,
                                                    timestampFormatters: Seq[TimestampFormatter],
                                                    logicalTypeHandlers: LogicalTypeHandlerStore = LogicalTypeHandlerStore.empty)
    extends WriteSupport[PageReader]
{

    private var currentParquetFileWriter: ParquetFileWriter = _

    override def init(configuration: Configuration): WriteContext =
    {
        val messageType: MessageType = EmbulkMessageType.builder()
            .withSchema(schema)
            .withLogicalTypeHandlers(logicalTypeHandlers)
            .build()
        val metadata: Map[String, String] = Map.empty // NOTE: When is this used?
        new WriteContext(messageType, metadata.asJava)
    }

    override def prepareForWrite(recordConsumer: RecordConsumer): Unit =
    {
        currentParquetFileWriter = ParquetFileWriter(recordConsumer, schema, timestampFormatters, logicalTypeHandlers)
    }

    override def write(record: PageReader): Unit =
    {
        currentParquetFileWriter.write(record)
    }
}

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
embulk-output-s3_parquet-0.1.0 src/main/scala/org/embulk/output/s3_parquet/parquet/ParquetFileWriteSupport.scala