Sha256: 0347e87052d980ccf3f916d3063482ecf80b76e675699c56ac085409ddec8526

Contents?: true

Size: 1006 Bytes

Versions: 1

Compression:

Stored size: 1006 Bytes

Contents

package org.embulk.filter.key_in_redis.redis

import redis.RedisClient

import scala.concurrent.duration._
import scala.concurrent._
import scala.util._

case class Redis(setKey: String, host: String, port: Int, db: Option[Int]) {
  implicit val actorSystem = akka.actor.ActorSystem(
    "redis-client",
    classLoader = Some(this.getClass.getClassLoader))
  val redis = RedisClient(host, port, db = db)

  def ping(): String = {
    import scala.concurrent.ExecutionContext.Implicits.global
    val s: Future[String] = redis.ping()
    s.onComplete {
      case Success(result) => result
      case Failure(t) =>
        actorSystem.shutdown()
        throw t
    }
    Await.result(s, 10.minute)
  }

  def exists(value: String): Boolean = {
    val s = redis.sismember(setKey, value)
    Await.result(s, 10.minute)
  }

  def nonExists(value: String): Boolean = !exists(value)

  def close(): Unit = {
    redis.stop()
    // wait for stopping.
    Thread.sleep(1000)
    actorSystem.shutdown()
  }

}

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
embulk-filter-key_in_redis-0.1.0 src/main/scala/org/embulk/filter/key_in_redis/redis/Redis.scala