Sha256: 80be007f5ccb4c39f7ebbc560196131286a066a20c4616371888645720e0cb4d

Contents?: true

Size: 1.96 KB

Versions: 1

Compression:

Stored size: 1.96 KB

Contents

package org.embulk.output.fluentd

import java.util

import org.embulk.config._
import org.embulk.output.fluentd.sender._
import org.embulk.spi._
import wvlet.log._

class FluentdOutputPlugin extends OutputPlugin {

  override def transaction(config: ConfigSource,
                           schema: Schema,
                           taskCount: Int,
                           control: OutputPlugin.Control): ConfigDiff = {
    Logger.setDefaultLogLevel(LogLevel.OFF)
    val task = config.loadConfig(classOf[PluginTask])
    FluentdOutputPlugin.taskCountOpt = Some(taskCount)
    control.run(task.dump())
    FluentdOutputPlugin.sender.foreach(_.close())
    Exec.newConfigDiff
  }

  override def resume(taskSource: TaskSource,
                      schema: Schema,
                      taskCount: Int,
                      control: OutputPlugin.Control): ConfigDiff =
    throw new UnsupportedOperationException("fluentd output plugin does not support resuming")

  override def cleanup(taskSource: TaskSource,
                       schema: Schema,
                       taskCount: Int,
                       successTaskReports: util.List[TaskReport]): Unit = {}

  override def open(taskSource: TaskSource, schema: Schema, taskIndex: Int): TransactionalPageOutput = {
    FluentdOutputPlugin.sender.synchronized {
      FluentdOutputPlugin.sender match {
        case Some(sender) =>
          FluentdTransactionalPageOutput(taskSource, schema, taskIndex, FluentdOutputPlugin.taskCountOpt, sender)
        case None =>
          val task = taskSource.loadTask(classOf[PluginTask])
          SenderBuilder(task).withSession { session =>
            val sender = session.build[Sender]
            FluentdOutputPlugin.sender = Option(sender)
            FluentdTransactionalPageOutput(taskSource, schema, taskIndex, FluentdOutputPlugin.taskCountOpt, sender)
          }
      }
    }
  }
}

object FluentdOutputPlugin {
  var sender: Option[Sender]    = None
  var taskCountOpt: Option[Int] = None
}

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
embulk-output-fluentd-0.1.0 src/main/scala/org/embulk/output/fluentd/FluentdOutputPlugin.scala