src/test/java/org/embulk/output/hdfs/TestHdfsFileOutputPlugin.java in embulk-output-hdfs-0.2.4 vs src/test/java/org/embulk/output/hdfs/TestHdfsFileOutputPlugin.java in embulk-output-hdfs-0.3.0

- old
+ new

@@ -2,10 +2,11 @@ import com.google.common.base.Charsets; import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.hadoop.fs.FileAlreadyExistsException; import org.embulk.EmbulkTestRuntime; import org.embulk.config.ConfigException; import org.embulk.config.ConfigSource; import org.embulk.config.TaskReport; import org.embulk.config.TaskSource; @@ -15,10 +16,11 @@ import org.embulk.spi.Page; import org.embulk.spi.PageTestUtils; import org.embulk.spi.Schema; import org.embulk.spi.TransactionalPageOutput; import org.embulk.spi.time.Timestamp; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; @@ -35,13 +37,15 @@ import static org.embulk.output.hdfs.HdfsFileOutputPlugin.*; import static org.embulk.spi.type.Types.*; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.core.Is.isA; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.msgpack.value.ValueFactory.newMap; import static org.msgpack.value.ValueFactory.newString; public class TestHdfsFileOutputPlugin { @@ -105,22 +109,30 @@ assertEquals("csv", task.getFileExt()); assertEquals("%03d.%02d.", task.getSequenceFormat()); assertEquals(Lists.newArrayList(), task.getConfigFiles()); assertEquals(Maps.newHashMap(), task.getConfig()); assertEquals(0, task.getRewindSeconds()); - assertEquals(false, task.getOverwrite()); + assertEquals(Optional.absent(), task.getOverwrite()); assertEquals(Optional.absent(), task.getDoas()); - assertEquals(PluginTask.DeleteInAdvancePolicy.NONE, task.getDeleteInAdvance()); + assertEquals(Optional.absent(), task.getDeleteInAdvance()); } @Test(expected = ConfigException.class) public void testRequiredValues() { ConfigSource config = Exec.newConfigSource(); PluginTask task = config.loadConfig(PluginTask.class); } + @Test(expected = ConfigException.class) + public void testSequenceFormatMode_replace() + { + run(getBaseConfigSource() + .set("mode", "replace") + .set("sequence_format", "%d/%d")); + } + private List<String> lsR(List<String> names, java.nio.file.Path dir) { try (DirectoryStream<java.nio.file.Path> stream = Files.newDirectoryStream(dir)) { for (java.nio.file.Path path : stream) { if (path.toFile().isDirectory()) { @@ -199,80 +211,199 @@ catch (IOException e) { logger.debug(e.getMessage(), e); } } + private ConfigSource getDefaultFsConfig() + { + return Exec.newConfigSource() + .set("fs.hdfs.impl", "org.apache.hadoop.fs.RawLocalFileSystem") + .set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem") + .set("fs.trash.interval", "3600") + .set("fs.defaultFS", "file:///"); + } + @Test public void testBulkLoad() { ConfigSource config = getBaseConfigSource() - .setNested("config", Exec.newConfigSource() - .set("fs.hdfs.impl", "org.apache.hadoop.fs.RawLocalFileSystem") - .set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem") - .set("fs.defaultFS", "file:///")); + .setNested("config", getDefaultFsConfig()); run(config); List<String> fileList = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath())); assertThat(fileList, hasItem(containsString(pathPrefix + "001.00.csv"))); assertRecordsInFile(String.format("%s/%s001.00.csv", tmpFolder.getRoot().getAbsolutePath(), pathPrefix)); } @Test - public void testDeleteRECURSIVEInAdvance() + public void testDeleteInAdvance_RECURSIVE() throws IOException { for (int n = 0; n <= 10; n++) { - tmpFolder.newFile("embulk-output-hdfs_file_" + n + ".txt"); - tmpFolder.newFolder("embulk-output-hdfs_directory_" + n); + tmpFolder.newFile("embulk-output-hdfs_testDeleteInAdvance_RECURSIVE_file_" + n + ".txt"); + tmpFolder.newFolder("embulk-output-hdfs_testDeleteInAdvance_RECURSIVE_directory_" + n); } List<String> fileListBeforeRun = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath())); ConfigSource config = getBaseConfigSource() - .setNested("config", Exec.newConfigSource() - .set("fs.hdfs.impl", "org.apache.hadoop.fs.RawLocalFileSystem") - .set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem") - .set("fs.defaultFS", "file:///")) + .setNested("config", getDefaultFsConfig()) .set("delete_in_advance", "RECURSIVE"); run(config); List<String> fileListAfterRun = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath())); assertNotEquals(fileListBeforeRun, fileListAfterRun); - assertThat(fileListAfterRun, not(hasItem(containsString("embulk-output-hdfs_directory")))); + assertThat(fileListAfterRun, not(hasItem(containsString("embulk-output-hdfs_testDeleteInAdvance_RECURSIVE_directory_")))); assertThat(fileListAfterRun, not(hasItem(containsString("txt")))); assertThat(fileListAfterRun, hasItem(containsString(pathPrefix + "001.00.csv"))); assertRecordsInFile(String.format("%s/%s001.00.csv", tmpFolder.getRoot().getAbsolutePath(), pathPrefix)); } @Test - public void testDeleteFILE_ONLYInAdvance() + public void testDeleteInAdvance_FILE_ONLY() throws IOException { for (int n = 0; n <= 10; n++) { - tmpFolder.newFile("embulk-output-hdfs_file_" + n + ".txt"); - tmpFolder.newFolder("embulk-output-hdfs_directory_" + n); + tmpFolder.newFile("embulk-output-hdfs_testDeleteInAdvance_FILE_ONLY_file_" + n + ".txt"); + tmpFolder.newFolder("embulk-output-hdfs_testDeleteInAdvance_FILE_ONLY_directory_" + n); } List<String> fileListBeforeRun = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath())); ConfigSource config = getBaseConfigSource() - .setNested("config", Exec.newConfigSource() - .set("fs.hdfs.impl", "org.apache.hadoop.fs.RawLocalFileSystem") - .set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem") - .set("fs.defaultFS", "file:///")) + .setNested("config", getDefaultFsConfig()) .set("delete_in_advance", "FILE_ONLY"); run(config); List<String> fileListAfterRun = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath())); assertNotEquals(fileListBeforeRun, fileListAfterRun); assertThat(fileListAfterRun, not(hasItem(containsString("txt")))); - assertThat(fileListAfterRun, hasItem(containsString("embulk-output-hdfs_directory"))); + assertThat(fileListAfterRun, hasItem(containsString("embulk-output-hdfs_testDeleteInAdvance_FILE_ONLY_directory_"))); + assertThat(fileListAfterRun, hasItem(containsString(pathPrefix + "001.00.csv"))); + assertRecordsInFile(String.format("%s/%s001.00.csv", + tmpFolder.getRoot().getAbsolutePath(), + pathPrefix)); + } + + @Test + public void testMode_delete_recursive_in_advance() + throws IOException + { + for (int n = 0; n <= 10; n++) { + tmpFolder.newFile("embulk-output-hdfs_testMode_delete_recursive_in_advance_file_" + n + ".txt"); + tmpFolder.newFolder("embulk-output-hdfs_testMode_delete_recursive_in_advance_directory_" + n); + } + + List<String> fileListBeforeRun = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath())); + + ConfigSource config = getBaseConfigSource() + .setNested("config", getDefaultFsConfig()) + .set("mode", "delete_recursive_in_advance"); + + run(config); + + List<String> fileListAfterRun = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath())); + assertNotEquals(fileListBeforeRun, fileListAfterRun); + assertThat(fileListAfterRun, not(hasItem(containsString("embulk-output-hdfs_testMode_delete_recursive_in_advance_directory_")))); + assertThat(fileListAfterRun, not(hasItem(containsString("txt")))); + assertThat(fileListAfterRun, hasItem(containsString(pathPrefix + "001.00.csv"))); + assertRecordsInFile(String.format("%s/%s001.00.csv", + tmpFolder.getRoot().getAbsolutePath(), + pathPrefix)); + } + + @Test + public void testMode_delete_files_in_advance() + throws IOException + { + for (int n = 0; n <= 10; n++) { + tmpFolder.newFile("embulk-output-hdfs_testMode_delete_files_in_advance_file_" + n + ".txt"); + tmpFolder.newFolder("embulk-output-hdfs_testMode_delete_files_in_advance_directory_" + n); + } + + List<String> fileListBeforeRun = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath())); + + ConfigSource config = getBaseConfigSource() + .setNested("config", getDefaultFsConfig()) + .set("mode", "delete_files_in_advance"); + + run(config); + + List<String> fileListAfterRun = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath())); + assertNotEquals(fileListBeforeRun, fileListAfterRun); + assertThat(fileListAfterRun, not(hasItem(containsString("txt")))); + assertThat(fileListAfterRun, hasItem(containsString("embulk-output-hdfs_testMode_delete_files_in_advance_directory_"))); + assertThat(fileListAfterRun, hasItem(containsString(pathPrefix + "001.00.csv"))); + assertRecordsInFile(String.format("%s/%s001.00.csv", + tmpFolder.getRoot().getAbsolutePath(), + pathPrefix)); + } + + @Test + public void testMode_abort_if_exist() + throws IOException + { + ConfigSource config = getBaseConfigSource() + .setNested("config", getDefaultFsConfig()) + .set("mode", "abort_if_exist"); + + run(config); + try { + run(config); + } + catch (Exception e) { + Throwable t = e; + while (t != null) { + t = t.getCause(); + if (t.getCause() instanceof FileAlreadyExistsException) { + Assert.assertTrue(true); + return; + } + } + Assert.fail("FileAlreadyExistsException is not cause."); + } + + } + + @Test + public void testMode_overwrite() + throws IOException + { + ConfigSource config = getBaseConfigSource() + .setNested("config", getDefaultFsConfig()) + .set("mode", "overwrite"); + + run(config); + run(config); + Assert.assertTrue(true); + } + + @Test + public void testMode_replace() + throws IOException + { + for (int n = 0; n <= 10; n++) { + tmpFolder.newFile("embulk-output-hdfs_testMode_delete_recursive_in_advance_file_" + n + ".txt"); + tmpFolder.newFolder("embulk-output-hdfs_testMode_delete_recursive_in_advance_directory_" + n); + } + + List<String> fileListBeforeRun = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath())); + + + run(getBaseConfigSource() + .set("config", getDefaultFsConfig()) + .set("mode", "replace")); + + List<String> fileListAfterRun = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath())); + assertNotEquals(fileListBeforeRun, fileListAfterRun); + assertThat(fileListAfterRun, not(hasItem(containsString("embulk-output-hdfs_testMode_delete_recursive_in_advance_directory_")))); + assertThat(fileListAfterRun, not(hasItem(containsString("txt")))); assertThat(fileListAfterRun, hasItem(containsString(pathPrefix + "001.00.csv"))); assertRecordsInFile(String.format("%s/%s001.00.csv", tmpFolder.getRoot().getAbsolutePath(), pathPrefix)); }