Sha256: 08b9ea69a7ffb5908c1af784f0fa0e557bec06f08d9bb154ada522850541f5d9

Contents?: true

Size: 1.95 KB

Versions: 1

Compression:

Stored size: 1.95 KB

Contents

package org.embulk.output.key_to_redis

import java.security.MessageDigest

import com.google.common.base.Optional
import org.embulk.config.{TaskReport, TaskSource}
import org.embulk.output.key_to_redis.column._
import org.embulk.spi.time.TimestampFormatter
import org.embulk.spi._
import org.bouncycastle.util.encoders.Hex
import org.embulk.output.key_to_redis.redis.Redis

import scala.collection.JavaConverters._

case class PageOutput(taskSource: TaskSource,
                      schema: Schema,
                      taskCountOpt: Option[Int],
                      putAsMD5: Boolean)
    extends TransactionalPageOutput {
  val task: PluginTask = taskSource.loadTask(classOf[PluginTask])
  val digestMd5: MessageDigest = MessageDigest.getInstance("MD5")

  def timestampFormatter(): TimestampFormatter =
    new TimestampFormatter(task, Optional.absent())

  val redis: Redis =
    KeyToRedisOutputPlugin.redis.getOrElse(sys.error("could not find redis."))

  override def add(page: Page): Unit = {
    val reader: PageReader = new PageReader(schema)
    reader.setPage(page)
    while (reader.nextRecord()) {
      val setValueVisitor = SetValueColumnVisitor(
        reader,
        timestampFormatter(),
        task.getKeyWithIndex.asScala.toMap,
        task.getJsonKeyWithIndex.asScala.toMap,
        task.getAppender)
      schema.visitColumns(setValueVisitor)
      val value = setValueVisitor.getValue
      if (value.nonEmpty) {
        if (putAsMD5) {
          val hash = Hex.toHexString(
            digestMd5.digest(setValueVisitor.getValue.getBytes()))
          redis.sadd(hash)
        } else {
          redis.sadd(setValueVisitor.getValue)
        }
      }
    }
    reader.close()
  }
  override def finish(): Unit = {
    // for map/reduce executor.
    if (taskCountOpt.isEmpty) {
      // close immediately.
      redis.close()
    }
  }
  override def close(): Unit = ()
  override def commit(): TaskReport = Exec.newTaskReport
  override def abort(): Unit = ()
}

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
embulk-output-key_to_redis-0.1.2 src/main/scala/org/embulk/output/key_to_redis/PageOutput.scala