Sha256: fd032b1eed508fcf62b69fe5338bc62da064e7a079922e29f5a0a96904b838cb
Contents?: true
Size: 1.38 KB
Versions: 1
Compression:
Stored size: 1.38 KB
Contents
package org.embulk.output.fluentd.sender import akka.actor._ import org.slf4j.Logger class SuperVisor extends Actor { var complete = 0 var failed = 0 var retried = 0 var counter = 0 var closed = false override def receive: Receive = { case Record(v) => counter = counter + v case Complete(v) => complete = complete + v case Failed(v) => failed = failed + v case Retried(v) => retried = retried + v case GetStatus => if (failed == 0) { sender() ! Result(counter, complete, failed, retried) } else { sender() ! Stop(counter, complete, failed, retried) } case Close => val result = ClosedStatus(closed) if (!closed) { closed = true } sender() ! result case LogStatus(logger) => logger.info( s"$counter was queued and $complete records was completed. $failed records was failed and retried $retried records.") } } case class Result(record: Int, complete: Int, failed: Int, retried: Int) case object GetStatus case class Stop(record: Int, complete: Int, failed: Int, retried: Int) case object Close case class ClosedStatus(alreadyClosed: Boolean) case class LogStatus(logger: Logger) case class Record(count: Int) extends AnyVal case class Complete(count: Int) extends AnyVal case class Failed(count: Int) extends AnyVal case class Retried(count: Int) extends AnyVal
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
embulk-output-fluentd-0.1.0 | src/main/scala/org/embulk/output/fluentd/sender/SuperVisor.scala |