src/main/scala/org/embulk/input/dynamodb/DynamodbInputPlugin.scala in embulk-input-dynamodb-0.1.1 vs src/main/scala/org/embulk/input/dynamodb/DynamodbInputPlugin.scala in embulk-input-dynamodb-0.2.0
- old
+ new
@@ -2,10 +2,11 @@
import java.util.{List => JList}
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import org.embulk.config._
+import org.embulk.input.dynamodb.ope.{QueryOperation, ScanOperation}
import org.embulk.spi._
class DynamodbInputPlugin extends InputPlugin {
def transaction(config: ConfigSource, control: InputPlugin.Control): ConfigDiff = {
val task: PluginTask = config.loadConfig(classOf[PluginTask])
@@ -22,11 +23,16 @@
}
def run(taskSource: TaskSource, schema: Schema, taskIndex: Int, output: PageOutput): TaskReport = {
val task: PluginTask = taskSource.loadTask(classOf[PluginTask])
- implicit val client: AmazonDynamoDBClient = DynamoDBUtil.createClient(task)
- DynamoDBUtil.scan(task, schema, output)
+ val client: AmazonDynamoDBClient = DynamoDBClient.create(task)
+
+ val ope = task.getOperation.toLowerCase match {
+ case "scan" => new ScanOperation(client)
+ case "query" => new QueryOperation(client)
+ }
+ ope.execute(task, schema, output)
Exec.newTaskReport()
}
def cleanup(taskSource: TaskSource, schema: Schema, taskCount: Int, successTaskReports: JList[TaskReport]): Unit = {