Sha256: fd032b1eed508fcf62b69fe5338bc62da064e7a079922e29f5a0a96904b838cb

Contents?: true

Size: 1.38 KB

Versions: 1

Compression:

Stored size: 1.38 KB

Contents

package org.embulk.output.fluentd.sender

import akka.actor._
import org.slf4j.Logger

class SuperVisor extends Actor {
  var complete = 0
  var failed   = 0
  var retried  = 0
  var counter  = 0
  var closed   = false
  override def receive: Receive = {
    case Record(v) =>
      counter = counter + v
    case Complete(v) =>
      complete = complete + v
    case Failed(v)  => failed = failed + v
    case Retried(v) => retried = retried + v
    case GetStatus =>
      if (failed == 0) {
        sender() ! Result(counter, complete, failed, retried)
      } else {
        sender() ! Stop(counter, complete, failed, retried)
      }
    case Close =>
      val result = ClosedStatus(closed)
      if (!closed) {
        closed = true
      }
      sender() ! result
    case LogStatus(logger) =>
      logger.info(
        s"$counter was queued and $complete records was completed. $failed records was failed and retried $retried records.")
  }
}

case class Result(record: Int, complete: Int, failed: Int, retried: Int)
case object GetStatus
case class Stop(record: Int, complete: Int, failed: Int, retried: Int)
case object Close
case class ClosedStatus(alreadyClosed: Boolean)
case class LogStatus(logger: Logger)
case class Record(count: Int)   extends AnyVal
case class Complete(count: Int) extends AnyVal
case class Failed(count: Int)   extends AnyVal
case class Retried(count: Int)  extends AnyVal

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
embulk-output-fluentd-0.1.0 src/main/scala/org/embulk/output/fluentd/sender/SuperVisor.scala