Sha256: e6517f18bff1fd0110b6f84b29343c10d702d1bed1818d2db5a8e7b51a9920a2
Contents?: true
Size: 1.5 KB
Versions: 2
Compression:
Stored size: 1.5 KB
Contents
package org.embulk.formatter.fast_jsonl import com.google.common.base.Optional import scala.collection.JavaConverters._ import org.embulk.formatter.fast_jsonl.json.ColumnVisitor import org.embulk.spi.{ FileOutput, Page, PageReader, Schema, PageOutput => EmbulkPageOutput } import org.embulk.spi.time.TimestampFormatter import org.embulk.spi.util.LineEncoder case class PageOutput(schema: Schema, task: PluginTask, output: FileOutput) extends EmbulkPageOutput { val encoder = new LineEncoder(output, task) val reader: PageReader = new PageReader(schema) val explodeColumns: Seq[String] = task.getExplodeJsonColumns().asScala val jsonColumns: Seq[String] = task.getJsonColumns().asScala val suffixKey: Map[String, String] = task.getSuffixKey().asScala.toMap private var opened: Boolean = false val timestampFormatter: TimestampFormatter = new TimestampFormatter(task, Optional.absent()) override def add(page: Page): Unit = { if (!opened) { encoder.nextFile() opened = true } val reader: PageReader = new PageReader(schema) reader.setPage(page) while (reader.nextRecord()) { val visitor = ColumnVisitor(reader, timestampFormatter, explodeColumns, jsonColumns, suffixKey) schema.visitColumns(visitor) encoder.addLine(visitor.getLine) } () } override def finish(): Unit = encoder.finish() override def close(): Unit = encoder.finish() }
Version data entries
2 entries across 2 versions & 1 rubygems