Sha256: a3cd685429bf7a8ac49d3b85ffc6cc462d4f7020898fe2dadcabea942dd9fc61

Contents?: true

Size: 1.43 KB

Versions: 1

Compression:

Stored size: 1.43 KB

Contents

package org.embulk.filter.key_in_redis

import org.embulk.config.{ConfigSource, TaskSource}
import org.embulk.filter.key_in_redis.redis.Redis
import org.embulk.spi
import org.embulk.spi._

class KeyInRedisFilterPlugin extends FilterPlugin {

  override def transaction(config: ConfigSource,
                           inputSchema: Schema,
                           control: FilterPlugin.Control): Unit = {
    val task = config.loadConfig(classOf[PluginTask])
    KeyInRedisFilterPlugin.createRedisInstance(task)
    KeyInRedisFilterPlugin.redis.foreach(_.ping())
    control.run(task.dump(), inputSchema)
    KeyInRedisFilterPlugin.redis.foreach(_.close())
  }

  override def open(taskSource: TaskSource,
                    inputSchema: Schema,
                    outputSchema: Schema,
                    output: spi.PageOutput): PageOutput = {
    val task = taskSource.loadTask(classOf[PluginTask])
    KeyInRedisFilterPlugin.redis match {
      case Some(_) => // nothing to do
      case None => // for map reduce executor.
        KeyInRedisFilterPlugin.createRedisInstance(task)
    }
    PageOutput(task, outputSchema, output)
  }
}

object KeyInRedisFilterPlugin {
  var redis: Option[Redis] = None
  def createRedisInstance(task: PluginTask): Unit = {
    KeyInRedisFilterPlugin.redis = Some(
      Redis(task.getRedisSetKey, task.getHost, task.getPort, {
        if (task.getDb.isPresent) Some(task.getDb.get())
        else None
      }))
  }
}

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
embulk-filter-key_in_redis-0.1.0 src/main/scala/org/embulk/filter/key_in_redis/KeyInRedisFilterPlugin.scala