Sha256: a3cd685429bf7a8ac49d3b85ffc6cc462d4f7020898fe2dadcabea942dd9fc61
Contents?: true
Size: 1.43 KB
Versions: 1
Compression:
Stored size: 1.43 KB
Contents
package org.embulk.filter.key_in_redis import org.embulk.config.{ConfigSource, TaskSource} import org.embulk.filter.key_in_redis.redis.Redis import org.embulk.spi import org.embulk.spi._ class KeyInRedisFilterPlugin extends FilterPlugin { override def transaction(config: ConfigSource, inputSchema: Schema, control: FilterPlugin.Control): Unit = { val task = config.loadConfig(classOf[PluginTask]) KeyInRedisFilterPlugin.createRedisInstance(task) KeyInRedisFilterPlugin.redis.foreach(_.ping()) control.run(task.dump(), inputSchema) KeyInRedisFilterPlugin.redis.foreach(_.close()) } override def open(taskSource: TaskSource, inputSchema: Schema, outputSchema: Schema, output: spi.PageOutput): PageOutput = { val task = taskSource.loadTask(classOf[PluginTask]) KeyInRedisFilterPlugin.redis match { case Some(_) => // nothing to do case None => // for map reduce executor. KeyInRedisFilterPlugin.createRedisInstance(task) } PageOutput(task, outputSchema, output) } } object KeyInRedisFilterPlugin { var redis: Option[Redis] = None def createRedisInstance(task: PluginTask): Unit = { KeyInRedisFilterPlugin.redis = Some( Redis(task.getRedisSetKey, task.getHost, task.getPort, { if (task.getDb.isPresent) Some(task.getDb.get()) else None })) } }
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
embulk-filter-key_in_redis-0.1.0 | src/main/scala/org/embulk/filter/key_in_redis/KeyInRedisFilterPlugin.scala |