Sha256: e07568b8ed5c6a2ed7399ff926edc7df808b2bce2360cc70262611bc5a901304

Contents?: true

Size: 1.3 KB

Versions: 1

Compression:

Stored size: 1.3 KB

Contents

package org.embulk.output.fluentd.sender

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Tcp}
import akka.stream.scaladsl.Tcp.OutgoingConnection
import akka.util.ByteString
import org.velvia.MsgPack

import scala.concurrent.Future

trait SenderFlow {
  val msgPackFlow: Flow[Seq[Seq[Map[String, AnyRef]]], (Int, ByteString), NotUsed]
  def tcpConnectionFlow(host: String, port: Int)(
      implicit s: ActorSystem): Flow[ByteString, ByteString, Future[OutgoingConnection]]
}

case class SenderFlowImpl private[sender] (tag: String, unixtime: Long, timeKeyOpt: Option[String])
    extends SenderFlow {
  override val msgPackFlow: Flow[Seq[Seq[Map[String, AnyRef]]], (Int, ByteString), NotUsed] =
    Flow[Seq[Seq[Map[String, AnyRef]]]].map { value =>
      val packing = value.flatten.map { v =>
        val eventTime = for {
          timeKey   <- timeKeyOpt
          timeValue <- v.get(timeKey)
        } yield timeValue.toString.toLong
        val logTime = eventTime.getOrElse(unixtime)
        Seq(logTime, v)
      }
      (packing.size, ByteString(MsgPack.pack(Seq(tag, packing))))
    }
  override def tcpConnectionFlow(host: String, port: Int)(
      implicit s: ActorSystem): Flow[ByteString, ByteString, Future[OutgoingConnection]] =
    Tcp().outgoingConnection(host, port)

}

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
embulk-output-fluentd-0.1.0 src/main/scala/org/embulk/output/fluentd/sender/SenderFlow.scala