Sha256: bceac58b8c2b1201d123507cfeb19dd02af23ff98ad546a13f306e81b7cac5dd

Contents?: true

Size: 1.28 KB

Versions: 1

Compression:

Stored size: 1.28 KB

Contents

package org.embulk.output.fluentd.sender

import java.time.Instant

import akka.actor.ActorSystem
import org.embulk.output.fluentd.PluginTask
import org.embulk.spi.Exec
import wvlet.airframe.{Design, newDesign}
import com.typesafe.config.ConfigFactory

object SenderBuilder {

  def apply(task: PluginTask): Design = {
    implicit val logger = Exec.getLogger(classOf[Sender])
    implicit val system = ActorSystem("fluentd-sender",
                                      ConfigFactory.load(this.getClass.getClassLoader, "application.conf"),
                                      this.getClass.getClassLoader)
    val timeKeyOpt = if (task.getTimeKey.isPresent) {
      Some(task.getTimeKey.get())
    } else None

    newDesign
      .bind[SenderFlow]
      .toInstance(SenderFlowImpl(task.getTag, Instant.now().getEpochSecond, timeKeyOpt))
      .bind[ActorManager]
      .toInstance(ActorManagerImpl())
      .bind[Sender]
      .toProvider { (senderFlow: SenderFlow, actorManager: ActorManager) =>
        SenderImpl(task.getHost,
                   task.getPort,
                   task.getRequestGroupingSize,
                   task.getAsyncSize,
                   senderFlow,
                   actorManager,
                   task.getRequestPerSeconds,
                   retryCount = 10)
      }
  }

}

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
embulk-output-fluentd-0.1.0 src/main/scala/org/embulk/output/fluentd/sender/SenderBuilder.scala