Sha256: 08b9ea69a7ffb5908c1af784f0fa0e557bec06f08d9bb154ada522850541f5d9
Contents?: true
Size: 1.95 KB
Versions: 1
Compression:
Stored size: 1.95 KB
Contents
package org.embulk.output.key_to_redis import java.security.MessageDigest import com.google.common.base.Optional import org.embulk.config.{TaskReport, TaskSource} import org.embulk.output.key_to_redis.column._ import org.embulk.spi.time.TimestampFormatter import org.embulk.spi._ import org.bouncycastle.util.encoders.Hex import org.embulk.output.key_to_redis.redis.Redis import scala.collection.JavaConverters._ case class PageOutput(taskSource: TaskSource, schema: Schema, taskCountOpt: Option[Int], putAsMD5: Boolean) extends TransactionalPageOutput { val task: PluginTask = taskSource.loadTask(classOf[PluginTask]) val digestMd5: MessageDigest = MessageDigest.getInstance("MD5") def timestampFormatter(): TimestampFormatter = new TimestampFormatter(task, Optional.absent()) val redis: Redis = KeyToRedisOutputPlugin.redis.getOrElse(sys.error("could not find redis.")) override def add(page: Page): Unit = { val reader: PageReader = new PageReader(schema) reader.setPage(page) while (reader.nextRecord()) { val setValueVisitor = SetValueColumnVisitor( reader, timestampFormatter(), task.getKeyWithIndex.asScala.toMap, task.getJsonKeyWithIndex.asScala.toMap, task.getAppender) schema.visitColumns(setValueVisitor) val value = setValueVisitor.getValue if (value.nonEmpty) { if (putAsMD5) { val hash = Hex.toHexString( digestMd5.digest(setValueVisitor.getValue.getBytes())) redis.sadd(hash) } else { redis.sadd(setValueVisitor.getValue) } } } reader.close() } override def finish(): Unit = { // for map/reduce executor. if (taskCountOpt.isEmpty) { // close immediately. redis.close() } } override def close(): Unit = () override def commit(): TaskReport = Exec.newTaskReport override def abort(): Unit = () }
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
embulk-output-key_to_redis-0.1.2 | src/main/scala/org/embulk/output/key_to_redis/PageOutput.scala |