Sha256: 356f3335d71bc9b69fc8c65fa87089803130235fae2a45952299a040b2782a3d

Contents?: true

Size: 1.88 KB

Versions: 2

Compression:

Stored size: 1.88 KB

Contents

package org.embulk.output.s3_parquet


import java.io.File
import java.nio.file.{Files, Paths}

import com.amazonaws.services.s3.transfer.{TransferManager, Upload}
import com.amazonaws.services.s3.transfer.model.UploadResult
import org.apache.parquet.hadoop.ParquetWriter
import org.embulk.config.TaskReport
import org.embulk.output.s3_parquet.aws.Aws
import org.embulk.spi.{Exec, Page, PageReader, TransactionalPageOutput}


case class S3ParquetPageOutput(outputLocalFile: String,
                               reader: PageReader,
                               writer: ParquetWriter[PageReader],
                               aws: Aws,
                               destBucket: String,
                               destKey: String)
    extends TransactionalPageOutput
{

    private var isClosed: Boolean = false

    override def add(page: Page): Unit =
    {
        reader.setPage(page)
        while (reader.nextRecord()) {
            writer.write(reader)
        }
    }

    override def finish(): Unit =
    {
    }

    override def close(): Unit =
    {
        synchronized {
            if (!isClosed) {
                writer.close()
                isClosed = true
            }
        }
    }

    override def abort(): Unit =
    {
        close()
        cleanup()
    }

    override def commit(): TaskReport =
    {
        close()
        val result: UploadResult = aws.withTransferManager { xfer: TransferManager =>
            val upload: Upload = xfer.upload(destBucket, destKey, new File(outputLocalFile))
            upload.waitForUploadResult()
        }
        cleanup()
        Exec.newTaskReport()
            .set("bucket", result.getBucketName)
            .set("key", result.getKey)
            .set("etag", result.getETag)
            .set("version_id", result.getVersionId)
    }

    private def cleanup(): Unit =
    {
        Files.delete(Paths.get(outputLocalFile))
    }
}

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
embulk-output-s3_parquet-0.1.0 src/main/scala/org/embulk/output/s3_parquet/S3ParquetPageOutput.scala
embulk-output-s3_parquet-0.0.3 src/main/scala/org/embulk/output/s3_parquet/S3ParquetPageOutput.scala