Sha256: e6517f18bff1fd0110b6f84b29343c10d702d1bed1818d2db5a8e7b51a9920a2

Contents?: true

Size: 1.5 KB

Versions: 2

Compression:

Stored size: 1.5 KB

Contents

package org.embulk.formatter.fast_jsonl

import com.google.common.base.Optional
import scala.collection.JavaConverters._
import org.embulk.formatter.fast_jsonl.json.ColumnVisitor
import org.embulk.spi.{
  FileOutput,
  Page,
  PageReader,
  Schema,
  PageOutput => EmbulkPageOutput
}
import org.embulk.spi.time.TimestampFormatter
import org.embulk.spi.util.LineEncoder

case class PageOutput(schema: Schema, task: PluginTask, output: FileOutput)
    extends EmbulkPageOutput {
  val encoder = new LineEncoder(output, task)
  val reader: PageReader = new PageReader(schema)
  val explodeColumns: Seq[String] = task.getExplodeJsonColumns().asScala
  val jsonColumns: Seq[String] = task.getJsonColumns().asScala
  val suffixKey: Map[String, String] = task.getSuffixKey().asScala.toMap
  private var opened: Boolean = false

  val timestampFormatter: TimestampFormatter =
    new TimestampFormatter(task, Optional.absent())

  override def add(page: Page): Unit = {
    if (!opened) {
      encoder.nextFile()
      opened = true
    }
    val reader: PageReader = new PageReader(schema)
    reader.setPage(page)
    while (reader.nextRecord()) {
      val visitor =
        ColumnVisitor(reader,
                      timestampFormatter,
                      explodeColumns,
                      jsonColumns,
                      suffixKey)
      schema.visitColumns(visitor)
      encoder.addLine(visitor.getLine)
    }
    ()
  }

  override def finish(): Unit = encoder.finish()
  override def close(): Unit = encoder.finish()

}

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
embulk-formatter-fast_jsonl-0.1.2 src/main/scala/org/embulk/formatter/fast_jsonl/PageOutput.scala
embulk-formatter-fast_jsonl-0.1.1 src/main/scala/org/embulk/formatter/fast_jsonl/PageOutput.scala