Sha256: 36170dfa26ee465356a3bace619a61d5d4644f5a3db903fb7ef34fcdeb39e9d0

Contents?: true

Size: 1.79 KB

Versions: 1

Compression:

Stored size: 1.79 KB

Contents

package org.embulk.filter.key_in_redis

import java.security.MessageDigest

import com.google.common.base.Optional
import org.bouncycastle.util.encoders.Hex
import org.embulk.filter.key_in_redis.column._

import scala.collection.JavaConverters._
import org.embulk.spi.time.TimestampFormatter
import org.embulk.spi.{
  Exec,
  Page,
  PageBuilder,
  PageReader,
  Schema,
  PageOutput => EmbulkPageOutput
}

case class PageOutput(task: PluginTask,
                      schema: Schema,
                      output: EmbulkPageOutput)
    extends EmbulkPageOutput {
  val pageBuilder = new PageBuilder(Exec.getBufferAllocator, schema, output)
  def timestampFormatter(): TimestampFormatter =
    new TimestampFormatter(task, Optional.absent())

  val digestMd5: MessageDigest = MessageDigest.getInstance("MD5")

  override def add(page: Page): Unit = {
    val reader: PageReader = new PageReader(schema)
    reader.setPage(page)
    while (reader.nextRecord()) {
      val setValueVisitor = SetValueColumnVisitor(
        reader,
        timestampFormatter(),
        task.getKeyWithIndex.asScala.toMap,
        task.getJsonKeyWithIndex.asScala.toMap,
        task.getAppender)
      schema.visitColumns(setValueVisitor)
      val matchValue = if (task.getMatchAsMD5) {
        Hex.toHexString(digestMd5.digest(setValueVisitor.getValue.getBytes()))
      } else setValueVisitor.getValue
      KeyInRedisFilterPlugin.redis.foreach { redis =>
        val passthroughColumnVisitor =
          PassthroughColumnVisitor(reader, pageBuilder)
        if (redis.nonExists(matchValue)) {
          schema.visitColumns(passthroughColumnVisitor)
          passthroughColumnVisitor.addRecord()
        }
      }
    }
    reader.close()
  }

  override def finish(): Unit = pageBuilder.finish()
  override def close(): Unit = pageBuilder.close()

}

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
embulk-filter-key_in_redis-0.1.0 src/main/scala/org/embulk/filter/key_in_redis/PageOutput.scala