src/main/scala/org/embulk/input/dynamodb/DynamodbInputPlugin.scala in embulk-input-dynamodb-0.2.0 vs src/main/scala/org/embulk/input/dynamodb/DynamodbInputPlugin.scala in embulk-input-dynamodb-0.3.0
- old
+ new
@@ -1,47 +1,98 @@
package org.embulk.input.dynamodb
import java.util.{List => JList}
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
-import org.embulk.config._
-import org.embulk.input.dynamodb.ope.{QueryOperation, ScanOperation}
-import org.embulk.spi._
+import org.embulk.config.{ConfigDiff, ConfigSource, TaskReport, TaskSource}
+import org.embulk.input.dynamodb.aws.Aws
+import org.embulk.input.dynamodb.item.DynamodbItemSchema
+import org.embulk.input.dynamodb.operation.DynamodbOperationProxy
+import org.embulk.spi.{Exec, InputPlugin, PageBuilder, PageOutput, Schema}
class DynamodbInputPlugin extends InputPlugin {
- def transaction(config: ConfigSource, control: InputPlugin.Control): ConfigDiff = {
- val task: PluginTask = config.loadConfig(classOf[PluginTask])
- val schema: Schema = task.getColumns.toSchema
- val taskCount: Int = 1
+ override def transaction(
+ config: ConfigSource,
+ control: InputPlugin.Control
+ ): ConfigDiff = {
+ val task: PluginTask = PluginTask.load(config)
+ if (isDeprecatedOperationRequired(task))
+ return DeprecatedDynamodbInputPlugin.transaction(config, control)
- resume(task.dump(), schema, taskCount, control)
- }
+ val schema: Schema = DynamodbItemSchema(task).getEmbulkSchema
+ val taskCount: Int = DynamodbOperationProxy(task).getEmbulkTaskCount
- def resume(taskSource: TaskSource, schema: Schema, taskCount: Int, control: InputPlugin.Control): ConfigDiff = {
- control.run(taskSource, schema, taskCount)
+ control.run(task.dump(), schema, taskCount)
Exec.newConfigDiff()
}
- def run(taskSource: TaskSource, schema: Schema, taskIndex: Int, output: PageOutput): TaskReport = {
- val task: PluginTask = taskSource.loadTask(classOf[PluginTask])
+ override def resume(
+ taskSource: TaskSource,
+ schema: Schema,
+ taskCount: Int,
+ control: InputPlugin.Control
+ ): ConfigDiff = {
+ val task: PluginTask = PluginTask.load(taskSource)
+ if (isDeprecatedOperationRequired(task))
+ return DeprecatedDynamodbInputPlugin.resume(
+ taskSource,
+ schema,
+ taskCount,
+ control
+ )
+ throw new UnsupportedOperationException
+ }
- val client: AmazonDynamoDBClient = DynamoDBClient.create(task)
+ override def run(
+ taskSource: TaskSource,
+ schema: Schema,
+ taskIndex: Int,
+ output: PageOutput
+ ): TaskReport = {
+ val task: PluginTask = PluginTask.load(taskSource)
+ if (isDeprecatedOperationRequired(task))
+ return DeprecatedDynamodbInputPlugin.run(
+ taskSource,
+ schema,
+ taskIndex,
+ output
+ )
- val ope = task.getOperation.toLowerCase match {
- case "scan" => new ScanOperation(client)
- case "query" => new QueryOperation(client)
- }
- ope.execute(task, schema, output)
+ val pageBuilder = new PageBuilder(task.getBufferAllocator, schema, output)
+ Aws(task).withDynamodb { dynamodb =>
+ DynamodbOperationProxy(task).run(
+ dynamodb,
+ taskIndex,
+ DynamodbItemSchema(task).getItemsConsumer(pageBuilder)
+ )
+ }
+ pageBuilder.finish()
Exec.newTaskReport()
}
- def cleanup(taskSource: TaskSource, schema: Schema, taskCount: Int, successTaskReports: JList[TaskReport]): Unit = {
- // TODO
+ override def cleanup(
+ taskSource: TaskSource,
+ schema: Schema,
+ taskCount: Int,
+ successTaskReports: JList[TaskReport]
+ ): Unit = {
+ val task: PluginTask = PluginTask.load(taskSource)
+ if (isDeprecatedOperationRequired(task))
+ DeprecatedDynamodbInputPlugin.cleanup(
+ taskSource,
+ schema,
+ taskCount,
+ successTaskReports
+ )
}
- def guess(config: ConfigSource): ConfigDiff = {
- // TODO
- null
+ override def guess(config: ConfigSource): ConfigDiff = {
+ val task: PluginTask = PluginTask.load(config)
+ if (isDeprecatedOperationRequired(task))
+ return DeprecatedDynamodbInputPlugin.guess(config)
+ throw new UnsupportedOperationException
}
+
+ private def isDeprecatedOperationRequired(task: PluginTask): Boolean =
+ task.getOperation.isPresent
}