Sha256: 0e10165423e31942c99433be16342e2e71cbb5005a913bdc00cf90d49bdc7ffc

Contents?: true

Size: 934 Bytes

Versions: 1

Compression:

Stored size: 934 Bytes

Contents

package org.embulk.output.key_to_redis.redis

import akka.actor.{Actor, ActorSystem}
import redis.RedisClient

import scala.collection.mutable.ListBuffer
import scala.concurrent.Future

case class Sender(setKey: String, redis: RedisClient) extends Actor {

  implicit val actorSystem: ActorSystem = context.system
  val buffer = new ListBuffer[String]
  val command = new ListBuffer[Future[Long]]

  override def receive: Receive = {
    case Message(v) =>
      buffer.append(v)
      // bulk insert
      if (buffer.size == 10000) {
        command.append(redis.sadd(setKey, buffer: _*))
        buffer.clear()
      }
    case Close =>
      command.append(redis.sadd(setKey, buffer: _*))
      buffer.clear()
    case GetStatus =>
      sender() ! Result(command.forall(_.isCompleted))
  }
}

case object GetStatus
case class Result(finished: Boolean) extends AnyVal
case object Close
case class Message(v: String) extends AnyVal

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
embulk-output-key_to_redis-0.1.2 src/main/scala/org/embulk/output/key_to_redis/redis/Sender.scala