src/main/scala/org/embulk/input/dynamodb/DynamodbInputPlugin.scala in embulk-input-dynamodb-0.1.1 vs src/main/scala/org/embulk/input/dynamodb/DynamodbInputPlugin.scala in embulk-input-dynamodb-0.2.0

- old
+ new

@@ -2,10 +2,11 @@ import java.util.{List => JList} import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient import org.embulk.config._ +import org.embulk.input.dynamodb.ope.{QueryOperation, ScanOperation} import org.embulk.spi._ class DynamodbInputPlugin extends InputPlugin { def transaction(config: ConfigSource, control: InputPlugin.Control): ConfigDiff = { val task: PluginTask = config.loadConfig(classOf[PluginTask]) @@ -22,11 +23,16 @@ } def run(taskSource: TaskSource, schema: Schema, taskIndex: Int, output: PageOutput): TaskReport = { val task: PluginTask = taskSource.loadTask(classOf[PluginTask]) - implicit val client: AmazonDynamoDBClient = DynamoDBUtil.createClient(task) - DynamoDBUtil.scan(task, schema, output) + val client: AmazonDynamoDBClient = DynamoDBClient.create(task) + + val ope = task.getOperation.toLowerCase match { + case "scan" => new ScanOperation(client) + case "query" => new QueryOperation(client) + } + ope.execute(task, schema, output) Exec.newTaskReport() } def cleanup(taskSource: TaskSource, schema: Schema, taskCount: Int, successTaskReports: JList[TaskReport]): Unit = {