src/test/java/org/embulk/input/s3/TestS3FileInputPlugin.java in embulk-input-s3-0.2.4 vs src/test/java/org/embulk/input/s3/TestS3FileInputPlugin.java in embulk-input-s3-0.2.5

- old
+ new

@@ -1,236 +1,220 @@ package org.embulk.input.s3; -import com.amazonaws.services.s3.AmazonS3Client; -import com.amazonaws.services.s3.model.GetObjectRequest; -import com.amazonaws.services.s3.model.ListObjectsRequest; -import com.amazonaws.services.s3.model.ObjectListing; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.S3Object; -import com.amazonaws.services.s3.model.S3ObjectInputStream; -import com.amazonaws.services.s3.model.S3ObjectSummary; -import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import org.embulk.EmbulkTestRuntime; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigSource; import org.embulk.config.TaskReport; import org.embulk.config.TaskSource; -import org.embulk.input.s3.AbstractS3FileInputPlugin.PluginTask; -import org.embulk.input.s3.AbstractS3FileInputPlugin.S3FileInput; -import org.embulk.input.s3.S3FileInputPlugin.S3PluginTask; -import org.embulk.spi.Exec; -import org.embulk.spi.FileInputPlugin; -import org.embulk.spi.util.LineDecoder; +import org.embulk.spi.FileInputRunner; +import org.embulk.spi.InputPlugin; +import org.embulk.spi.PageOutput; +import org.embulk.spi.Schema; +import org.embulk.spi.TestPageBuilderReader.MockPageOutput; +import org.embulk.spi.util.Pages; +import org.embulk.standards.CsvParserPlugin; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; -import java.io.ByteArrayInputStream; -import java.util.Arrays; +import java.util.ArrayList; import java.util.List; -import static org.junit.Assert.*; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assume.assumeNotNull; public class TestS3FileInputPlugin { + private static String EMBULK_S3_TEST_BUCKET; + private static String EMBULK_S3_TEST_ACCESS_KEY_ID; + private static String EMBULK_S3_TEST_SECRET_ACCESS_KEY; + private static final String EMBULK_S3_TEST_PATH_PREFIX = "embulk_input_s3_test"; + + /* + * This test case requires environment variables: + * EMBULK_S3_TEST_BUCKET + * EMBULK_S3_TEST_ACCESS_KEY_ID + * EMBULK_S3_TEST_SECRET_ACCESS_KEY + * If the variables not set, the test case is skipped. + */ + @BeforeClass + public static void initializeConstantVariables() + { + EMBULK_S3_TEST_BUCKET = System.getenv("EMBULK_S3_TEST_BUCKET"); + EMBULK_S3_TEST_ACCESS_KEY_ID = System.getenv("EMBULK_S3_TEST_ACCESS_KEY_ID"); + EMBULK_S3_TEST_SECRET_ACCESS_KEY = System.getenv("EMBULK_S3_TEST_SECRET_ACCESS_KEY"); + assumeNotNull(EMBULK_S3_TEST_BUCKET, EMBULK_S3_TEST_ACCESS_KEY_ID, EMBULK_S3_TEST_SECRET_ACCESS_KEY); + } + @Rule public EmbulkTestRuntime runtime = new EmbulkTestRuntime(); private ConfigSource config; - private S3FileInputPlugin plugin; - private AmazonS3Client client; + private FileInputRunner runner; + private MockPageOutput output; @Before public void createResources() { - config = config(); - plugin = spy(runtime.getInstance(S3FileInputPlugin.class)); - client = mock(AmazonS3Client.class); + config = runtime.getExec().newConfigSource() + .set("type", "s3") + .set("bucket", EMBULK_S3_TEST_BUCKET) + .set("access_key_id", EMBULK_S3_TEST_ACCESS_KEY_ID) + .set("secret_access_key", EMBULK_S3_TEST_SECRET_ACCESS_KEY) + .set("path_prefix", EMBULK_S3_TEST_PATH_PREFIX) + .set("parser", parserConfig(schemaConfig())); + runner = new FileInputRunner(runtime.getInstance(S3FileInputPlugin.class)); + output = new MockPageOutput(); } @Test - public void checkS3ClientCreatedSuccessfully() + public void simpleTest() { - PluginTask task = config().loadConfig(plugin.getTaskClass()); - plugin.newS3Client(task); + ConfigSource config = this.config.deepCopy(); + ConfigDiff configDiff = runner.transaction(config, new Control(runner, output)); + + assertEquals(EMBULK_S3_TEST_PATH_PREFIX + "/sample_01.csv", configDiff.get(String.class, "last_path")); + assertRecords(config, output); } @Test - public void listS3FilesByPrefix() + public void useLastPath() + throws Exception { - // AWSS3Client returns list1 for the first iteration and list2 next. - List<S3ObjectSummary> list1 = s3objects("in/", 0L, "in/file/", 0L, "in/file/sample.csv.gz", 12345L); - List<S3ObjectSummary> list2 = s3objects("sample2.csv.gz", 0L); - ObjectListing ol = mock(ObjectListing.class); + ConfigSource config = this.config.deepCopy().set("last_path", EMBULK_S3_TEST_PATH_PREFIX + "/sample_01.csv"); + ConfigDiff configDiff = runner.transaction(config, new Control(runner, output)); - doReturn(list1).doReturn(list2).when(ol).getObjectSummaries(); - doReturn(ol).when(client).listObjects(any(ListObjectsRequest.class)); - doReturn("in/file/").doReturn(null).when(ol).getNextMarker(); - - // It counts only size != 0 files. - FileList.Builder builder = new FileList.Builder(); - S3FileInputPlugin.listS3FilesByPrefix(builder, client, "bucketName", "prefix", Optional.<String>absent()); - assertEquals(1, builder.size()); + assertEquals(EMBULK_S3_TEST_PATH_PREFIX + "/sample_01.csv", configDiff.get(String.class, "last_path")); + assertEquals(0, getRecords(config, output).size()); } @Test - public void checkLastPath() + public void emptyFilesWithLastPath() + throws Exception { - doReturn(null).when(client).listObjects(any(ListObjectsRequest.class)); - doReturn(client).when(plugin).newS3Client(any(PluginTask.class)); + ConfigSource config = this.config.deepCopy() + .set("path_prefix", "empty_files_prefix") + .set("last_path", EMBULK_S3_TEST_PATH_PREFIX + "/sample_01.csv"); + ConfigDiff configDiff = runner.transaction(config, new Control(runner, output)); - { // set a last file to last_path - ObjectListing listing = listing("in/aa", 0L, "in/aa/a", 3L, "in/aa/b", 2L, "in/aa/c", 1L); - doReturn(listing).when(client).listObjects(any(ListObjectsRequest.class)); - - ConfigDiff configDiff = plugin.transaction(config, new FileInputPlugin.Control() { - @Override - public List<TaskReport> run(TaskSource taskSource, int taskCount) - { - assertEquals(3, taskCount); - List<String> files = fileListToList(taskSource.loadTask(S3PluginTask.class).getFiles()); - assertArrayEquals(new String[]{"in/aa/a", "in/aa/b", "in/aa/c"}, files.toArray(new String[files.size()])); - return emptyTaskReports(taskCount); - } - }); - - assertEquals("in/aa/c", configDiff.get(String.class, "last_path")); - } - - { // if files are empty and last_path is not specified, last_path is not set. - ObjectListing listing = listing("in/aa", 0L); - doReturn(listing).when(client).listObjects(any(ListObjectsRequest.class)); - - ConfigDiff configDiff = plugin.transaction(config, new FileInputPlugin.Control() { - @Override - public List<TaskReport> run(TaskSource taskSource, int taskCount) - { - assertEquals(0, taskCount); - assertTrue(fileListToList(taskSource.loadTask(S3PluginTask.class).getFiles()).isEmpty()); - return emptyTaskReports(taskCount); - } - }); - - assertEquals(null, configDiff.get(String.class, "last_path", null)); - } - - { // if files are empty, keep the previous last_path. - config.set("last_path", "in/bb"); - - ObjectListing listing = listing("in/aa", 0L); - doReturn(listing).when(client).listObjects(any(ListObjectsRequest.class)); - - ConfigDiff configDiff = plugin.transaction(config, new FileInputPlugin.Control() { - @Override - public List<TaskReport> run(TaskSource taskSource, int taskCount) { - assertEquals(0, taskCount); - assertTrue(fileListToList(taskSource.loadTask(S3PluginTask.class).getFiles()).isEmpty()); - return emptyTaskReports(taskCount); - } - }); - - assertEquals("in/bb", configDiff.get(String.class, "last_path")); - } + assertEquals(EMBULK_S3_TEST_PATH_PREFIX + "/sample_01.csv", configDiff.get(String.class, "last_path")); // keep the last_path + assertEquals(0, getRecords(config, output).size()); } @Test - public void checkS3FileInputByOpen() + public void useTotalFileCountLimit() throws Exception { - doReturn(s3object("in/aa/a", "aa")).when(client).getObject(any(GetObjectRequest.class)); - doReturn(client).when(plugin).newS3Client(any(PluginTask.class)); + ConfigSource config = this.config.deepCopy().set("total_file_count_limit", 0); + ConfigDiff configDiff = runner.transaction(config, new Control(runner, output)); - PluginTask task = config.loadConfig(plugin.getTaskClass()); - FileList.Builder builder = new FileList.Builder(); - builder.add("in/aa/a", 100); - task.setFiles(builder.build()); - - StringBuilder sbuf = new StringBuilder(); - try (S3FileInput input = (S3FileInput) plugin.open(task.dump(), 0)) { - LineDecoder d = new LineDecoder(input, config.loadConfig(LineDecoder.DecoderTask.class)); - while (d.nextFile()) { - sbuf.append(d.poll()); - } - } - assertEquals("aa", sbuf.toString()); + assertNull(configDiff.get(String.class, "last_path")); + assertEquals(0, getRecords(config, output).size()); } - public static ConfigSource config() + @Test + public void usePathMatchPattern() + throws Exception { - return Exec.newConfigSource() - .set("bucket", "my_bucket") - .set("path_prefix", "my_path_prefix") - .set("access_key_id", "my_access_key_id") - .set("secret_access_key", "my_secret_access_key"); - } + ConfigSource config = this.config.deepCopy().set("path_match_pattern", "/match/"); + ConfigDiff configDiff = runner.transaction(config, new Control(runner, output)); - static ObjectListing listing(Object... keySizes) - { - ObjectListing listing = mock(ObjectListing.class); - if (keySizes == null) { - return listing; - } - - List<S3ObjectSummary> s3objects = s3objects(keySizes); - doReturn(s3objects).when(listing).getObjectSummaries(); - doReturn(null).when(listing).getNextMarker(); - return listing; + assertNull(configDiff.get(String.class, "last_path")); + assertEquals(0, getRecords(config, output).size()); } - static List<S3ObjectSummary> s3objects(Object... keySizes) + static class Control + implements InputPlugin.Control { - ImmutableList.Builder<S3ObjectSummary> builder = new ImmutableList.Builder<>(); - if (keySizes == null) { - return builder.build(); + private FileInputRunner runner; + private PageOutput output; + + Control(FileInputRunner runner, PageOutput output) + { + this.runner = runner; + this.output = output; } - for (int i = 0; i < keySizes.length; i += 2) { - String key = (String) keySizes[i]; - long size = (Long) keySizes[i + 1]; - builder.add(s3object(key, size)); + @Override + public List<TaskReport> run(TaskSource taskSource, Schema schema, int taskCount) + { + List<TaskReport> reports = new ArrayList<>(); + for (int i = 0; i < taskCount; i++) { + reports.add(runner.run(taskSource, schema, i, output)); + } + return reports; } - return builder.build(); } - static S3ObjectSummary s3object(String key, long size) + static ImmutableMap<String, Object> parserConfig(ImmutableList<Object> schemaConfig) { - S3ObjectSummary o = new S3ObjectSummary(); - o.setKey(key); - o.setSize(size); - return o; + ImmutableMap.Builder<String, Object> builder = new ImmutableMap.Builder<>(); + builder.put("type", "csv"); + builder.put("newline", "CRLF"); + builder.put("delimiter", ","); + builder.put("quote", "\""); + builder.put("escape", "\""); + builder.put("trim_if_not_quoted", false); + builder.put("skip_header_lines", 0); + builder.put("allow_extra_columns", false); + builder.put("allow_optional_columns", false); + builder.put("columns", schemaConfig); + return builder.build(); } - static S3Object s3object(String key, String value) + static ImmutableList<Object> schemaConfig() { - S3Object o = new S3Object(); - o.setObjectContent(new S3ObjectInputStream(new ByteArrayInputStream(value.getBytes()), null)); - ObjectMetadata om = new ObjectMetadata(); - om.setContentLength(value.length()); - o.setObjectMetadata(om); - return o; + ImmutableList.Builder<Object> builder = new ImmutableList.Builder<>(); + builder.add(ImmutableMap.of("name", "timestamp", "type", "timestamp", "format", "%Y-%m-%d %H:%M:%S")); + builder.add(ImmutableMap.of("name", "host", "type", "string")); + builder.add(ImmutableMap.of("name", "path", "type", "string")); + builder.add(ImmutableMap.of("name", "method", "type", "string")); + builder.add(ImmutableMap.of("name", "referer", "type", "string")); + builder.add(ImmutableMap.of("name", "code", "type", "long")); + builder.add(ImmutableMap.of("name", "agent", "type", "string")); + builder.add(ImmutableMap.of("name", "user", "type", "string")); + builder.add(ImmutableMap.of("name", "size", "type", "long")); + return builder.build(); } - static List<TaskReport> emptyTaskReports(int taskCount) + static void assertRecords(ConfigSource config, MockPageOutput output) { - ImmutableList.Builder<TaskReport> reports = new ImmutableList.Builder<>(); - for (int i = 0; i < taskCount; i++) { - reports.add(Exec.newTaskReport()); + List<Object[]> records = getRecords(config, output); + + assertEquals(2, records.size()); + { + Object[] record = records.get(0); + assertEquals("2014-10-02 22:15:39 UTC", record[0].toString()); + assertEquals("84.186.29.187", record[1]); + assertEquals("/category/electronics", record[2]); + assertEquals("GET", record[3]); + assertEquals("/category/music", record[4]); + assertEquals(200L, record[5]); + assertEquals("Mozilla/5.0", record[6]); + assertEquals("-", record[7]); + assertEquals(136L, record[8]); } - return reports.build(); + + { + Object[] record = records.get(1); + assertEquals("2014-10-02 22:15:01 UTC", record[0].toString()); + assertEquals("140.36.216.47", record[1]); + assertEquals("/category/music?from=10", record[2]); + assertEquals("GET", record[3]); + assertEquals("-", record[4]); + assertEquals(200L, record[5]); + assertEquals("Mozilla/5.0", record[6]); + assertEquals("-", record[7]); + assertEquals(70L, record[8]); + } } - private static List<String> fileListToList(FileList list) + static List<Object[]> getRecords(ConfigSource config, MockPageOutput output) { - ImmutableList.Builder<String> builder = ImmutableList.builder(); - for (int i=0; i < list.getTaskCount(); i++) { - for (String path : list.get(i)) { - builder.add(path); - } - } - return builder.build(); + Schema schema = config.getNested("parser").loadConfig(CsvParserPlugin.PluginTask.class).getSchemaConfig().toSchema(); + return Pages.toObjects(schema, output.pages); } }