src/main/scala/org/embulk/input/dynamodb/DynamodbInputPlugin.scala in embulk-input-dynamodb-0.2.0 vs src/main/scala/org/embulk/input/dynamodb/DynamodbInputPlugin.scala in embulk-input-dynamodb-0.3.0

- old
+ new

@@ -1,47 +1,98 @@ package org.embulk.input.dynamodb import java.util.{List => JList} -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient -import org.embulk.config._ -import org.embulk.input.dynamodb.ope.{QueryOperation, ScanOperation} -import org.embulk.spi._ +import org.embulk.config.{ConfigDiff, ConfigSource, TaskReport, TaskSource} +import org.embulk.input.dynamodb.aws.Aws +import org.embulk.input.dynamodb.item.DynamodbItemSchema +import org.embulk.input.dynamodb.operation.DynamodbOperationProxy +import org.embulk.spi.{Exec, InputPlugin, PageBuilder, PageOutput, Schema} class DynamodbInputPlugin extends InputPlugin { - def transaction(config: ConfigSource, control: InputPlugin.Control): ConfigDiff = { - val task: PluginTask = config.loadConfig(classOf[PluginTask]) - val schema: Schema = task.getColumns.toSchema - val taskCount: Int = 1 + override def transaction( + config: ConfigSource, + control: InputPlugin.Control + ): ConfigDiff = { + val task: PluginTask = PluginTask.load(config) + if (isDeprecatedOperationRequired(task)) + return DeprecatedDynamodbInputPlugin.transaction(config, control) - resume(task.dump(), schema, taskCount, control) - } + val schema: Schema = DynamodbItemSchema(task).getEmbulkSchema + val taskCount: Int = DynamodbOperationProxy(task).getEmbulkTaskCount - def resume(taskSource: TaskSource, schema: Schema, taskCount: Int, control: InputPlugin.Control): ConfigDiff = { - control.run(taskSource, schema, taskCount) + control.run(task.dump(), schema, taskCount) Exec.newConfigDiff() } - def run(taskSource: TaskSource, schema: Schema, taskIndex: Int, output: PageOutput): TaskReport = { - val task: PluginTask = taskSource.loadTask(classOf[PluginTask]) + override def resume( + taskSource: TaskSource, + schema: Schema, + taskCount: Int, + control: InputPlugin.Control + ): ConfigDiff = { + val task: PluginTask = PluginTask.load(taskSource) + if (isDeprecatedOperationRequired(task)) + return DeprecatedDynamodbInputPlugin.resume( + taskSource, + schema, + taskCount, + control + ) + throw new UnsupportedOperationException + } - val client: AmazonDynamoDBClient = DynamoDBClient.create(task) + override def run( + taskSource: TaskSource, + schema: Schema, + taskIndex: Int, + output: PageOutput + ): TaskReport = { + val task: PluginTask = PluginTask.load(taskSource) + if (isDeprecatedOperationRequired(task)) + return DeprecatedDynamodbInputPlugin.run( + taskSource, + schema, + taskIndex, + output + ) - val ope = task.getOperation.toLowerCase match { - case "scan" => new ScanOperation(client) - case "query" => new QueryOperation(client) - } - ope.execute(task, schema, output) + val pageBuilder = new PageBuilder(task.getBufferAllocator, schema, output) + Aws(task).withDynamodb { dynamodb => + DynamodbOperationProxy(task).run( + dynamodb, + taskIndex, + DynamodbItemSchema(task).getItemsConsumer(pageBuilder) + ) + } + pageBuilder.finish() Exec.newTaskReport() } - def cleanup(taskSource: TaskSource, schema: Schema, taskCount: Int, successTaskReports: JList[TaskReport]): Unit = { - // TODO + override def cleanup( + taskSource: TaskSource, + schema: Schema, + taskCount: Int, + successTaskReports: JList[TaskReport] + ): Unit = { + val task: PluginTask = PluginTask.load(taskSource) + if (isDeprecatedOperationRequired(task)) + DeprecatedDynamodbInputPlugin.cleanup( + taskSource, + schema, + taskCount, + successTaskReports + ) } - def guess(config: ConfigSource): ConfigDiff = { - // TODO - null + override def guess(config: ConfigSource): ConfigDiff = { + val task: PluginTask = PluginTask.load(config) + if (isDeprecatedOperationRequired(task)) + return DeprecatedDynamodbInputPlugin.guess(config) + throw new UnsupportedOperationException } + + private def isDeprecatedOperationRequired(task: PluginTask): Boolean = + task.getOperation.isPresent }