Sha256: 8abf08f9c3eb165dd3c5db93a14094a771533273357f65c645312b9427a0087d
Contents?: true
Size: 1.68 KB
Versions: 1
Compression:
Stored size: 1.68 KB
Contents
package org.embulk.output.fluentd import com.google.common.base.Optional import org.embulk.config.{TaskReport, TaskSource} import org.embulk.output.fluentd.sender.Sender import org.embulk.spi._ import org.embulk.spi.time.TimestampFormatter case class FluentdTransactionalPageOutput(taskSource: TaskSource, schema: Schema, taskIndex: Int, taskCountOpt: Option[Int], sender: Sender) extends TransactionalPageOutput { val task: PluginTask = taskSource.loadTask(classOf[PluginTask]) val logger = Exec.getLogger(classOf[FluentdTransactionalPageOutput]) def timestampFormatter(): TimestampFormatter = new TimestampFormatter(task, Optional.absent()) override def add(page: Page): Unit = { sender(asIterator(page).toSeq) } def asIterator(page: Page): Iterator[Map[String, AnyRef]] = { val reader: PageReader = new PageReader(schema) reader.setPage(page) Iterator.continually { if (reader.nextRecord()) { val visitor = ColumnVisitor(reader, timestampFormatter()) schema.visitColumns(visitor) visitor.getRecord } else { reader.close() Map.empty[String, AnyRef] } } takeWhile (_ != Map.empty[String, AnyRef]) } override def commit(): TaskReport = Exec.newTaskReport override def abort(): Unit = () override def finish(): Unit = { logger.debug(s"finished at " + this) // for map/reduce executor. if (taskCountOpt.isEmpty) { // close immediately. sender.close() } } override def close(): Unit = () }
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
embulk-output-fluentd-0.1.0 | src/main/scala/org/embulk/output/fluentd/FluentdTransactionalPageOutput.scala |