Sha256: 356f3335d71bc9b69fc8c65fa87089803130235fae2a45952299a040b2782a3d
Contents?: true
Size: 1.88 KB
Versions: 2
Compression:
Stored size: 1.88 KB
Contents
package org.embulk.output.s3_parquet import java.io.File import java.nio.file.{Files, Paths} import com.amazonaws.services.s3.transfer.{TransferManager, Upload} import com.amazonaws.services.s3.transfer.model.UploadResult import org.apache.parquet.hadoop.ParquetWriter import org.embulk.config.TaskReport import org.embulk.output.s3_parquet.aws.Aws import org.embulk.spi.{Exec, Page, PageReader, TransactionalPageOutput} case class S3ParquetPageOutput(outputLocalFile: String, reader: PageReader, writer: ParquetWriter[PageReader], aws: Aws, destBucket: String, destKey: String) extends TransactionalPageOutput { private var isClosed: Boolean = false override def add(page: Page): Unit = { reader.setPage(page) while (reader.nextRecord()) { writer.write(reader) } } override def finish(): Unit = { } override def close(): Unit = { synchronized { if (!isClosed) { writer.close() isClosed = true } } } override def abort(): Unit = { close() cleanup() } override def commit(): TaskReport = { close() val result: UploadResult = aws.withTransferManager { xfer: TransferManager => val upload: Upload = xfer.upload(destBucket, destKey, new File(outputLocalFile)) upload.waitForUploadResult() } cleanup() Exec.newTaskReport() .set("bucket", result.getBucketName) .set("key", result.getKey) .set("etag", result.getETag) .set("version_id", result.getVersionId) } private def cleanup(): Unit = { Files.delete(Paths.get(outputLocalFile)) } }
Version data entries
2 entries across 2 versions & 1 rubygems