Sha256: 0adecdfa6d50a2d3f0b4c1137e08a07651f517255a926b34a696edb4f9487981

Contents?: true

Size: 1.43 KB

Versions: 1

Compression:

Stored size: 1.43 KB

Contents

package org.embulk.output.fluentd

import org.embulk.spi.time.TimestampFormatter
import org.embulk.spi.{Column, PageReader, ColumnVisitor => EmbulkColumnVisitor}

case class ColumnVisitor(reader: PageReader, timestampFormatter: TimestampFormatter) extends EmbulkColumnVisitor {
  import scala.collection.mutable

  private val record = mutable.Map[String, AnyRef]()

  override def timestampColumn(column: Column): Unit =
    value(column, reader.getTimestamp).foreach(v => put(column, timestampFormatter.format(v)))

  override def stringColumn(column: Column): Unit =
    value(column, reader.getString).foreach(v => put(column, v))

  override def longColumn(column: Column): Unit =
    value(column, reader.getLong).foreach(v => put(column, Long.box(v)))

  override def doubleColumn(column: Column): Unit =
    value(column, reader.getDouble).foreach(v => put(column, Double.box(v)))

  override def booleanColumn(column: Column): Unit =
    value(column, reader.getBoolean).foreach(v => put(column, Boolean.box(v)))

  override def jsonColumn(column: Column): Unit =
    value(column, reader.getJson).foreach(v => put(column, v.toJson))

  def value[A](column: Column, method: => (Column => A)): Option[A] =
    if (reader.isNull(column)) {
      None
    } else {
      Some(method(column))
    }

  def put[A <: AnyRef](column: Column, value: A): Unit = {
    record.put(column.getName, value)
    ()
  }

  def getRecord: Map[String, AnyRef] = record.toMap

}

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
embulk-output-fluentd-0.1.0 src/main/scala/org/embulk/output/fluentd/ColumnVisitor.scala