Sha256: e07568b8ed5c6a2ed7399ff926edc7df808b2bce2360cc70262611bc5a901304
Contents?: true
Size: 1.3 KB
Versions: 1
Compression:
Stored size: 1.3 KB
Contents
package org.embulk.output.fluentd.sender import akka.NotUsed import akka.actor.ActorSystem import akka.stream.scaladsl.{Flow, Tcp} import akka.stream.scaladsl.Tcp.OutgoingConnection import akka.util.ByteString import org.velvia.MsgPack import scala.concurrent.Future trait SenderFlow { val msgPackFlow: Flow[Seq[Seq[Map[String, AnyRef]]], (Int, ByteString), NotUsed] def tcpConnectionFlow(host: String, port: Int)( implicit s: ActorSystem): Flow[ByteString, ByteString, Future[OutgoingConnection]] } case class SenderFlowImpl private[sender] (tag: String, unixtime: Long, timeKeyOpt: Option[String]) extends SenderFlow { override val msgPackFlow: Flow[Seq[Seq[Map[String, AnyRef]]], (Int, ByteString), NotUsed] = Flow[Seq[Seq[Map[String, AnyRef]]]].map { value => val packing = value.flatten.map { v => val eventTime = for { timeKey <- timeKeyOpt timeValue <- v.get(timeKey) } yield timeValue.toString.toLong val logTime = eventTime.getOrElse(unixtime) Seq(logTime, v) } (packing.size, ByteString(MsgPack.pack(Seq(tag, packing)))) } override def tcpConnectionFlow(host: String, port: Int)( implicit s: ActorSystem): Flow[ByteString, ByteString, Future[OutgoingConnection]] = Tcp().outgoingConnection(host, port) }
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
embulk-output-fluentd-0.1.0 | src/main/scala/org/embulk/output/fluentd/sender/SenderFlow.scala |