Sha256: 0e10165423e31942c99433be16342e2e71cbb5005a913bdc00cf90d49bdc7ffc
Contents?: true
Size: 934 Bytes
Versions: 1
Compression:
Stored size: 934 Bytes
Contents
package org.embulk.output.key_to_redis.redis import akka.actor.{Actor, ActorSystem} import redis.RedisClient import scala.collection.mutable.ListBuffer import scala.concurrent.Future case class Sender(setKey: String, redis: RedisClient) extends Actor { implicit val actorSystem: ActorSystem = context.system val buffer = new ListBuffer[String] val command = new ListBuffer[Future[Long]] override def receive: Receive = { case Message(v) => buffer.append(v) // bulk insert if (buffer.size == 10000) { command.append(redis.sadd(setKey, buffer: _*)) buffer.clear() } case Close => command.append(redis.sadd(setKey, buffer: _*)) buffer.clear() case GetStatus => sender() ! Result(command.forall(_.isCompleted)) } } case object GetStatus case class Result(finished: Boolean) extends AnyVal case object Close case class Message(v: String) extends AnyVal
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
embulk-output-key_to_redis-0.1.2 | src/main/scala/org/embulk/output/key_to_redis/redis/Sender.scala |