src/test/java/org/embulk/input/s3/TestS3FileInputPlugin.java in embulk-input-s3-0.2.2 vs src/test/java/org/embulk/input/s3/TestS3FileInputPlugin.java in embulk-input-s3-0.2.3

- old
+ new

@@ -1,42 +1,221 @@ package org.embulk.input.s3; -import static org.junit.Assert.*; -import java.util.List; -import org.junit.Test; -import org.mockito.Mockito; - -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableList; import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.ListObjectsRequest; import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import org.embulk.EmbulkTestRuntime; +import org.embulk.config.ConfigDiff; +import org.embulk.config.ConfigSource; +import org.embulk.config.TaskReport; +import org.embulk.config.TaskSource; +import org.embulk.input.s3.AbstractS3FileInputPlugin.PluginTask; +import org.embulk.input.s3.AbstractS3FileInputPlugin.S3FileInput; +import org.embulk.input.s3.S3FileInputPlugin.S3PluginTask; +import org.embulk.spi.Exec; +import org.embulk.spi.FileInputPlugin; +import org.embulk.spi.util.LineDecoder; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; + public class TestS3FileInputPlugin { + @Rule + public EmbulkTestRuntime runtime = new EmbulkTestRuntime(); + + private ConfigSource config; + private S3FileInputPlugin plugin; + private AmazonS3Client client; + + @Before + public void createResources() + { + config = config(); + plugin = spy(runtime.getInstance(S3FileInputPlugin.class)); + client = mock(AmazonS3Client.class); + } + @Test + public void checkS3ClientCreatedSuccessfully() + { + PluginTask task = config().loadConfig(plugin.getTaskClass()); + plugin.newS3Client(task); + } + + @Test public void listS3FilesByPrefix() { // AWSS3Client returns list1 for the first iteration and list2 next. - List<S3ObjectSummary> list1 = ImmutableList.<S3ObjectSummary> of(bucket("in/", 0), bucket("in/file/", 0), - bucket("in/file/sample.csv.gz", 12345)); - List<S3ObjectSummary> list2 = ImmutableList.<S3ObjectSummary> of(bucket("sample2.csv.gz", 0)); - ObjectListing ol = Mockito.mock(ObjectListing.class); + List<S3ObjectSummary> list1 = s3objects("in/", 0L, "in/file/", 0L, "in/file/sample.csv.gz", 12345L); + List<S3ObjectSummary> list2 = s3objects("sample2.csv.gz", 0L); + ObjectListing ol = mock(ObjectListing.class); - Mockito.doReturn(list1).doReturn(list2).when(ol).getObjectSummaries(); - AmazonS3Client client = Mockito.mock(AmazonS3Client.class); - Mockito.doReturn(ol).when(client).listObjects(Mockito.any(ListObjectsRequest.class)); - Mockito.doReturn("in/file/").doReturn(null).when(ol).getNextMarker(); + doReturn(list1).doReturn(list2).when(ol).getObjectSummaries(); + doReturn(ol).when(client).listObjects(any(ListObjectsRequest.class)); + doReturn("in/file/").doReturn(null).when(ol).getNextMarker(); // It counts only size != 0 files. assertEquals(1, S3FileInputPlugin.listS3FilesByPrefix(client, "bucketName", "prefix", Optional.<String>absent()).size()); } - private S3ObjectSummary bucket(String key, long size) + @Test + public void checkLastPath() { - S3ObjectSummary bucket = new S3ObjectSummary(); - bucket.setKey(key); - bucket.setSize(size); - return bucket; + doReturn(null).when(client).listObjects(any(ListObjectsRequest.class)); + doReturn(client).when(plugin).newS3Client(any(PluginTask.class)); + + { // set a last file to last_path + ObjectListing listing = listing("in/aa", 0L, "in/aa/a", 3L, "in/aa/b", 2L, "in/aa/c", 1L); + doReturn(listing).when(client).listObjects(any(ListObjectsRequest.class)); + + ConfigDiff configDiff = plugin.transaction(config, new FileInputPlugin.Control() { + @Override + public List<TaskReport> run(TaskSource taskSource, int taskCount) + { + assertEquals(3, taskCount); + List<String> files = taskSource.loadTask(S3PluginTask.class).getFiles(); + assertArrayEquals(new String[]{"in/aa/a", "in/aa/b", "in/aa/c"}, files.toArray(new String[files.size()])); + return emptyTaskReports(taskCount); + } + }); + + assertEquals("in/aa/c", configDiff.get(String.class, "last_path")); + } + + { // if files are empty and last_path is not specified, last_path is not set. + ObjectListing listing = listing("in/aa", 0L); + doReturn(listing).when(client).listObjects(any(ListObjectsRequest.class)); + + ConfigDiff configDiff = plugin.transaction(config, new FileInputPlugin.Control() { + @Override + public List<TaskReport> run(TaskSource taskSource, int taskCount) + { + assertEquals(0, taskCount); + assertTrue(taskSource.loadTask(S3PluginTask.class).getFiles().isEmpty()); + return emptyTaskReports(taskCount); + } + }); + + assertFalse(configDiff.has("last_path")); + } + + { // if files are empty, keep the previous last_path. + config.set("last_path", "in/bb"); + + ObjectListing listing = listing("in/aa", 0L); + doReturn(listing).when(client).listObjects(any(ListObjectsRequest.class)); + + ConfigDiff configDiff = plugin.transaction(config, new FileInputPlugin.Control() { + @Override + public List<TaskReport> run(TaskSource taskSource, int taskCount) { + assertEquals(0, taskCount); + assertTrue(taskSource.loadTask(S3PluginTask.class).getFiles().isEmpty()); + return emptyTaskReports(taskCount); + } + }); + + assertEquals("in/bb", configDiff.get(String.class, "last_path")); + } + } + + @Test + public void checkS3FileInputByOpen() + throws Exception + { + doReturn(s3object("in/aa/a", "aa")).when(client).getObject(any(GetObjectRequest.class)); + doReturn(client).when(plugin).newS3Client(any(PluginTask.class)); + + PluginTask task = config.loadConfig(plugin.getTaskClass()); + task.setFiles(Arrays.asList(new String[]{"in/aa/a"})); + + StringBuilder sbuf = new StringBuilder(); + try (S3FileInput input = (S3FileInput) plugin.open(task.dump(), 0)) { + LineDecoder d = new LineDecoder(input, config.loadConfig(LineDecoder.DecoderTask.class)); + while (d.nextFile()) { + sbuf.append(d.poll()); + } + } + assertEquals("aa", sbuf.toString()); + } + + public static ConfigSource config() + { + return Exec.newConfigSource() + .set("bucket", "my_bucket") + .set("path_prefix", "my_path_prefix") + .set("access_key_id", "my_access_key_id") + .set("secret_access_key", "my_secret_access_key"); + } + + static ObjectListing listing(Object... keySizes) + { + ObjectListing listing = mock(ObjectListing.class); + if (keySizes == null) { + return listing; + } + + List<S3ObjectSummary> s3objects = s3objects(keySizes); + doReturn(s3objects).when(listing).getObjectSummaries(); + doReturn(null).when(listing).getNextMarker(); + return listing; + } + + static List<S3ObjectSummary> s3objects(Object... keySizes) + { + ImmutableList.Builder<S3ObjectSummary> builder = new ImmutableList.Builder<>(); + if (keySizes == null) { + return builder.build(); + } + + for (int i = 0; i < keySizes.length; i += 2) { + String key = (String) keySizes[i]; + long size = (Long) keySizes[i + 1]; + builder.add(s3object(key, size)); + } + return builder.build(); + } + + static S3ObjectSummary s3object(String key, long size) + { + S3ObjectSummary o = new S3ObjectSummary(); + o.setKey(key); + o.setSize(size); + return o; + } + + static S3Object s3object(String key, String value) + { + S3Object o = new S3Object(); + o.setObjectContent(new S3ObjectInputStream(new ByteArrayInputStream(value.getBytes()), null)); + ObjectMetadata om = new ObjectMetadata(); + om.setContentLength(value.length()); + o.setObjectMetadata(om); + return o; + } + + static List<TaskReport> emptyTaskReports(int taskCount) + { + ImmutableList.Builder<TaskReport> reports = new ImmutableList.Builder<>(); + for (int i = 0; i < taskCount; i++) { + reports.add(Exec.newTaskReport()); + } + return reports.build(); } }