src/test/java/org/embulk/input/s3/TestS3FileInputPlugin.java in embulk-input-s3-0.2.4 vs src/test/java/org/embulk/input/s3/TestS3FileInputPlugin.java in embulk-input-s3-0.2.5
- old
+ new
@@ -1,236 +1,220 @@
package org.embulk.input.s3;
-import com.amazonaws.services.s3.AmazonS3Client;
-import com.amazonaws.services.s3.model.GetObjectRequest;
-import com.amazonaws.services.s3.model.ListObjectsRequest;
-import com.amazonaws.services.s3.model.ObjectListing;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.S3Object;
-import com.amazonaws.services.s3.model.S3ObjectInputStream;
-import com.amazonaws.services.s3.model.S3ObjectSummary;
-import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import org.embulk.EmbulkTestRuntime;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigSource;
import org.embulk.config.TaskReport;
import org.embulk.config.TaskSource;
-import org.embulk.input.s3.AbstractS3FileInputPlugin.PluginTask;
-import org.embulk.input.s3.AbstractS3FileInputPlugin.S3FileInput;
-import org.embulk.input.s3.S3FileInputPlugin.S3PluginTask;
-import org.embulk.spi.Exec;
-import org.embulk.spi.FileInputPlugin;
-import org.embulk.spi.util.LineDecoder;
+import org.embulk.spi.FileInputRunner;
+import org.embulk.spi.InputPlugin;
+import org.embulk.spi.PageOutput;
+import org.embulk.spi.Schema;
+import org.embulk.spi.TestPageBuilderReader.MockPageOutput;
+import org.embulk.spi.util.Pages;
+import org.embulk.standards.CsvParserPlugin;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
-import java.io.ByteArrayInputStream;
-import java.util.Arrays;
+import java.util.ArrayList;
import java.util.List;
-import static org.junit.Assert.*;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assume.assumeNotNull;
public class TestS3FileInputPlugin
{
+ private static String EMBULK_S3_TEST_BUCKET;
+ private static String EMBULK_S3_TEST_ACCESS_KEY_ID;
+ private static String EMBULK_S3_TEST_SECRET_ACCESS_KEY;
+ private static final String EMBULK_S3_TEST_PATH_PREFIX = "embulk_input_s3_test";
+
+ /*
+ * This test case requires environment variables:
+ * EMBULK_S3_TEST_BUCKET
+ * EMBULK_S3_TEST_ACCESS_KEY_ID
+ * EMBULK_S3_TEST_SECRET_ACCESS_KEY
+ * If the variables not set, the test case is skipped.
+ */
+ @BeforeClass
+ public static void initializeConstantVariables()
+ {
+ EMBULK_S3_TEST_BUCKET = System.getenv("EMBULK_S3_TEST_BUCKET");
+ EMBULK_S3_TEST_ACCESS_KEY_ID = System.getenv("EMBULK_S3_TEST_ACCESS_KEY_ID");
+ EMBULK_S3_TEST_SECRET_ACCESS_KEY = System.getenv("EMBULK_S3_TEST_SECRET_ACCESS_KEY");
+ assumeNotNull(EMBULK_S3_TEST_BUCKET, EMBULK_S3_TEST_ACCESS_KEY_ID, EMBULK_S3_TEST_SECRET_ACCESS_KEY);
+ }
+
@Rule
public EmbulkTestRuntime runtime = new EmbulkTestRuntime();
private ConfigSource config;
- private S3FileInputPlugin plugin;
- private AmazonS3Client client;
+ private FileInputRunner runner;
+ private MockPageOutput output;
@Before
public void createResources()
{
- config = config();
- plugin = spy(runtime.getInstance(S3FileInputPlugin.class));
- client = mock(AmazonS3Client.class);
+ config = runtime.getExec().newConfigSource()
+ .set("type", "s3")
+ .set("bucket", EMBULK_S3_TEST_BUCKET)
+ .set("access_key_id", EMBULK_S3_TEST_ACCESS_KEY_ID)
+ .set("secret_access_key", EMBULK_S3_TEST_SECRET_ACCESS_KEY)
+ .set("path_prefix", EMBULK_S3_TEST_PATH_PREFIX)
+ .set("parser", parserConfig(schemaConfig()));
+ runner = new FileInputRunner(runtime.getInstance(S3FileInputPlugin.class));
+ output = new MockPageOutput();
}
@Test
- public void checkS3ClientCreatedSuccessfully()
+ public void simpleTest()
{
- PluginTask task = config().loadConfig(plugin.getTaskClass());
- plugin.newS3Client(task);
+ ConfigSource config = this.config.deepCopy();
+ ConfigDiff configDiff = runner.transaction(config, new Control(runner, output));
+
+ assertEquals(EMBULK_S3_TEST_PATH_PREFIX + "/sample_01.csv", configDiff.get(String.class, "last_path"));
+ assertRecords(config, output);
}
@Test
- public void listS3FilesByPrefix()
+ public void useLastPath()
+ throws Exception
{
- // AWSS3Client returns list1 for the first iteration and list2 next.
- List<S3ObjectSummary> list1 = s3objects("in/", 0L, "in/file/", 0L, "in/file/sample.csv.gz", 12345L);
- List<S3ObjectSummary> list2 = s3objects("sample2.csv.gz", 0L);
- ObjectListing ol = mock(ObjectListing.class);
+ ConfigSource config = this.config.deepCopy().set("last_path", EMBULK_S3_TEST_PATH_PREFIX + "/sample_01.csv");
+ ConfigDiff configDiff = runner.transaction(config, new Control(runner, output));
- doReturn(list1).doReturn(list2).when(ol).getObjectSummaries();
- doReturn(ol).when(client).listObjects(any(ListObjectsRequest.class));
- doReturn("in/file/").doReturn(null).when(ol).getNextMarker();
-
- // It counts only size != 0 files.
- FileList.Builder builder = new FileList.Builder();
- S3FileInputPlugin.listS3FilesByPrefix(builder, client, "bucketName", "prefix", Optional.<String>absent());
- assertEquals(1, builder.size());
+ assertEquals(EMBULK_S3_TEST_PATH_PREFIX + "/sample_01.csv", configDiff.get(String.class, "last_path"));
+ assertEquals(0, getRecords(config, output).size());
}
@Test
- public void checkLastPath()
+ public void emptyFilesWithLastPath()
+ throws Exception
{
- doReturn(null).when(client).listObjects(any(ListObjectsRequest.class));
- doReturn(client).when(plugin).newS3Client(any(PluginTask.class));
+ ConfigSource config = this.config.deepCopy()
+ .set("path_prefix", "empty_files_prefix")
+ .set("last_path", EMBULK_S3_TEST_PATH_PREFIX + "/sample_01.csv");
+ ConfigDiff configDiff = runner.transaction(config, new Control(runner, output));
- { // set a last file to last_path
- ObjectListing listing = listing("in/aa", 0L, "in/aa/a", 3L, "in/aa/b", 2L, "in/aa/c", 1L);
- doReturn(listing).when(client).listObjects(any(ListObjectsRequest.class));
-
- ConfigDiff configDiff = plugin.transaction(config, new FileInputPlugin.Control() {
- @Override
- public List<TaskReport> run(TaskSource taskSource, int taskCount)
- {
- assertEquals(3, taskCount);
- List<String> files = fileListToList(taskSource.loadTask(S3PluginTask.class).getFiles());
- assertArrayEquals(new String[]{"in/aa/a", "in/aa/b", "in/aa/c"}, files.toArray(new String[files.size()]));
- return emptyTaskReports(taskCount);
- }
- });
-
- assertEquals("in/aa/c", configDiff.get(String.class, "last_path"));
- }
-
- { // if files are empty and last_path is not specified, last_path is not set.
- ObjectListing listing = listing("in/aa", 0L);
- doReturn(listing).when(client).listObjects(any(ListObjectsRequest.class));
-
- ConfigDiff configDiff = plugin.transaction(config, new FileInputPlugin.Control() {
- @Override
- public List<TaskReport> run(TaskSource taskSource, int taskCount)
- {
- assertEquals(0, taskCount);
- assertTrue(fileListToList(taskSource.loadTask(S3PluginTask.class).getFiles()).isEmpty());
- return emptyTaskReports(taskCount);
- }
- });
-
- assertEquals(null, configDiff.get(String.class, "last_path", null));
- }
-
- { // if files are empty, keep the previous last_path.
- config.set("last_path", "in/bb");
-
- ObjectListing listing = listing("in/aa", 0L);
- doReturn(listing).when(client).listObjects(any(ListObjectsRequest.class));
-
- ConfigDiff configDiff = plugin.transaction(config, new FileInputPlugin.Control() {
- @Override
- public List<TaskReport> run(TaskSource taskSource, int taskCount) {
- assertEquals(0, taskCount);
- assertTrue(fileListToList(taskSource.loadTask(S3PluginTask.class).getFiles()).isEmpty());
- return emptyTaskReports(taskCount);
- }
- });
-
- assertEquals("in/bb", configDiff.get(String.class, "last_path"));
- }
+ assertEquals(EMBULK_S3_TEST_PATH_PREFIX + "/sample_01.csv", configDiff.get(String.class, "last_path")); // keep the last_path
+ assertEquals(0, getRecords(config, output).size());
}
@Test
- public void checkS3FileInputByOpen()
+ public void useTotalFileCountLimit()
throws Exception
{
- doReturn(s3object("in/aa/a", "aa")).when(client).getObject(any(GetObjectRequest.class));
- doReturn(client).when(plugin).newS3Client(any(PluginTask.class));
+ ConfigSource config = this.config.deepCopy().set("total_file_count_limit", 0);
+ ConfigDiff configDiff = runner.transaction(config, new Control(runner, output));
- PluginTask task = config.loadConfig(plugin.getTaskClass());
- FileList.Builder builder = new FileList.Builder();
- builder.add("in/aa/a", 100);
- task.setFiles(builder.build());
-
- StringBuilder sbuf = new StringBuilder();
- try (S3FileInput input = (S3FileInput) plugin.open(task.dump(), 0)) {
- LineDecoder d = new LineDecoder(input, config.loadConfig(LineDecoder.DecoderTask.class));
- while (d.nextFile()) {
- sbuf.append(d.poll());
- }
- }
- assertEquals("aa", sbuf.toString());
+ assertNull(configDiff.get(String.class, "last_path"));
+ assertEquals(0, getRecords(config, output).size());
}
- public static ConfigSource config()
+ @Test
+ public void usePathMatchPattern()
+ throws Exception
{
- return Exec.newConfigSource()
- .set("bucket", "my_bucket")
- .set("path_prefix", "my_path_prefix")
- .set("access_key_id", "my_access_key_id")
- .set("secret_access_key", "my_secret_access_key");
- }
+ ConfigSource config = this.config.deepCopy().set("path_match_pattern", "/match/");
+ ConfigDiff configDiff = runner.transaction(config, new Control(runner, output));
- static ObjectListing listing(Object... keySizes)
- {
- ObjectListing listing = mock(ObjectListing.class);
- if (keySizes == null) {
- return listing;
- }
-
- List<S3ObjectSummary> s3objects = s3objects(keySizes);
- doReturn(s3objects).when(listing).getObjectSummaries();
- doReturn(null).when(listing).getNextMarker();
- return listing;
+ assertNull(configDiff.get(String.class, "last_path"));
+ assertEquals(0, getRecords(config, output).size());
}
- static List<S3ObjectSummary> s3objects(Object... keySizes)
+ static class Control
+ implements InputPlugin.Control
{
- ImmutableList.Builder<S3ObjectSummary> builder = new ImmutableList.Builder<>();
- if (keySizes == null) {
- return builder.build();
+ private FileInputRunner runner;
+ private PageOutput output;
+
+ Control(FileInputRunner runner, PageOutput output)
+ {
+ this.runner = runner;
+ this.output = output;
}
- for (int i = 0; i < keySizes.length; i += 2) {
- String key = (String) keySizes[i];
- long size = (Long) keySizes[i + 1];
- builder.add(s3object(key, size));
+ @Override
+ public List<TaskReport> run(TaskSource taskSource, Schema schema, int taskCount)
+ {
+ List<TaskReport> reports = new ArrayList<>();
+ for (int i = 0; i < taskCount; i++) {
+ reports.add(runner.run(taskSource, schema, i, output));
+ }
+ return reports;
}
- return builder.build();
}
- static S3ObjectSummary s3object(String key, long size)
+ static ImmutableMap<String, Object> parserConfig(ImmutableList<Object> schemaConfig)
{
- S3ObjectSummary o = new S3ObjectSummary();
- o.setKey(key);
- o.setSize(size);
- return o;
+ ImmutableMap.Builder<String, Object> builder = new ImmutableMap.Builder<>();
+ builder.put("type", "csv");
+ builder.put("newline", "CRLF");
+ builder.put("delimiter", ",");
+ builder.put("quote", "\"");
+ builder.put("escape", "\"");
+ builder.put("trim_if_not_quoted", false);
+ builder.put("skip_header_lines", 0);
+ builder.put("allow_extra_columns", false);
+ builder.put("allow_optional_columns", false);
+ builder.put("columns", schemaConfig);
+ return builder.build();
}
- static S3Object s3object(String key, String value)
+ static ImmutableList<Object> schemaConfig()
{
- S3Object o = new S3Object();
- o.setObjectContent(new S3ObjectInputStream(new ByteArrayInputStream(value.getBytes()), null));
- ObjectMetadata om = new ObjectMetadata();
- om.setContentLength(value.length());
- o.setObjectMetadata(om);
- return o;
+ ImmutableList.Builder<Object> builder = new ImmutableList.Builder<>();
+ builder.add(ImmutableMap.of("name", "timestamp", "type", "timestamp", "format", "%Y-%m-%d %H:%M:%S"));
+ builder.add(ImmutableMap.of("name", "host", "type", "string"));
+ builder.add(ImmutableMap.of("name", "path", "type", "string"));
+ builder.add(ImmutableMap.of("name", "method", "type", "string"));
+ builder.add(ImmutableMap.of("name", "referer", "type", "string"));
+ builder.add(ImmutableMap.of("name", "code", "type", "long"));
+ builder.add(ImmutableMap.of("name", "agent", "type", "string"));
+ builder.add(ImmutableMap.of("name", "user", "type", "string"));
+ builder.add(ImmutableMap.of("name", "size", "type", "long"));
+ return builder.build();
}
- static List<TaskReport> emptyTaskReports(int taskCount)
+ static void assertRecords(ConfigSource config, MockPageOutput output)
{
- ImmutableList.Builder<TaskReport> reports = new ImmutableList.Builder<>();
- for (int i = 0; i < taskCount; i++) {
- reports.add(Exec.newTaskReport());
+ List<Object[]> records = getRecords(config, output);
+
+ assertEquals(2, records.size());
+ {
+ Object[] record = records.get(0);
+ assertEquals("2014-10-02 22:15:39 UTC", record[0].toString());
+ assertEquals("84.186.29.187", record[1]);
+ assertEquals("/category/electronics", record[2]);
+ assertEquals("GET", record[3]);
+ assertEquals("/category/music", record[4]);
+ assertEquals(200L, record[5]);
+ assertEquals("Mozilla/5.0", record[6]);
+ assertEquals("-", record[7]);
+ assertEquals(136L, record[8]);
}
- return reports.build();
+
+ {
+ Object[] record = records.get(1);
+ assertEquals("2014-10-02 22:15:01 UTC", record[0].toString());
+ assertEquals("140.36.216.47", record[1]);
+ assertEquals("/category/music?from=10", record[2]);
+ assertEquals("GET", record[3]);
+ assertEquals("-", record[4]);
+ assertEquals(200L, record[5]);
+ assertEquals("Mozilla/5.0", record[6]);
+ assertEquals("-", record[7]);
+ assertEquals(70L, record[8]);
+ }
}
- private static List<String> fileListToList(FileList list)
+ static List<Object[]> getRecords(ConfigSource config, MockPageOutput output)
{
- ImmutableList.Builder<String> builder = ImmutableList.builder();
- for (int i=0; i < list.getTaskCount(); i++) {
- for (String path : list.get(i)) {
- builder.add(path);
- }
- }
- return builder.build();
+ Schema schema = config.getNested("parser").loadConfig(CsvParserPlugin.PluginTask.class).getSchemaConfig().toSchema();
+ return Pages.toObjects(schema, output.pages);
}
}