src/test/java/org/embulk/output/hdfs/TestHdfsFileOutputPlugin.java in embulk-output-hdfs-0.2.4 vs src/test/java/org/embulk/output/hdfs/TestHdfsFileOutputPlugin.java in embulk-output-hdfs-0.3.0
- old
+ new
@@ -2,10 +2,11 @@
import com.google.common.base.Charsets;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.embulk.EmbulkTestRuntime;
import org.embulk.config.ConfigException;
import org.embulk.config.ConfigSource;
import org.embulk.config.TaskReport;
import org.embulk.config.TaskSource;
@@ -15,10 +16,11 @@
import org.embulk.spi.Page;
import org.embulk.spi.PageTestUtils;
import org.embulk.spi.Schema;
import org.embulk.spi.TransactionalPageOutput;
import org.embulk.spi.time.Timestamp;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
@@ -35,13 +37,15 @@
import static org.embulk.output.hdfs.HdfsFileOutputPlugin.*;
import static org.embulk.spi.type.Types.*;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.core.Is.isA;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.msgpack.value.ValueFactory.newMap;
import static org.msgpack.value.ValueFactory.newString;
public class TestHdfsFileOutputPlugin
{
@@ -105,22 +109,30 @@
assertEquals("csv", task.getFileExt());
assertEquals("%03d.%02d.", task.getSequenceFormat());
assertEquals(Lists.newArrayList(), task.getConfigFiles());
assertEquals(Maps.newHashMap(), task.getConfig());
assertEquals(0, task.getRewindSeconds());
- assertEquals(false, task.getOverwrite());
+ assertEquals(Optional.absent(), task.getOverwrite());
assertEquals(Optional.absent(), task.getDoas());
- assertEquals(PluginTask.DeleteInAdvancePolicy.NONE, task.getDeleteInAdvance());
+ assertEquals(Optional.absent(), task.getDeleteInAdvance());
}
@Test(expected = ConfigException.class)
public void testRequiredValues()
{
ConfigSource config = Exec.newConfigSource();
PluginTask task = config.loadConfig(PluginTask.class);
}
+ @Test(expected = ConfigException.class)
+ public void testSequenceFormatMode_replace()
+ {
+ run(getBaseConfigSource()
+ .set("mode", "replace")
+ .set("sequence_format", "%d/%d"));
+ }
+
private List<String> lsR(List<String> names, java.nio.file.Path dir)
{
try (DirectoryStream<java.nio.file.Path> stream = Files.newDirectoryStream(dir)) {
for (java.nio.file.Path path : stream) {
if (path.toFile().isDirectory()) {
@@ -199,80 +211,199 @@
catch (IOException e) {
logger.debug(e.getMessage(), e);
}
}
+ private ConfigSource getDefaultFsConfig()
+ {
+ return Exec.newConfigSource()
+ .set("fs.hdfs.impl", "org.apache.hadoop.fs.RawLocalFileSystem")
+ .set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem")
+ .set("fs.trash.interval", "3600")
+ .set("fs.defaultFS", "file:///");
+ }
+
@Test
public void testBulkLoad()
{
ConfigSource config = getBaseConfigSource()
- .setNested("config", Exec.newConfigSource()
- .set("fs.hdfs.impl", "org.apache.hadoop.fs.RawLocalFileSystem")
- .set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem")
- .set("fs.defaultFS", "file:///"));
+ .setNested("config", getDefaultFsConfig());
run(config);
List<String> fileList = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath()));
assertThat(fileList, hasItem(containsString(pathPrefix + "001.00.csv")));
assertRecordsInFile(String.format("%s/%s001.00.csv",
tmpFolder.getRoot().getAbsolutePath(),
pathPrefix));
}
@Test
- public void testDeleteRECURSIVEInAdvance()
+ public void testDeleteInAdvance_RECURSIVE()
throws IOException
{
for (int n = 0; n <= 10; n++) {
- tmpFolder.newFile("embulk-output-hdfs_file_" + n + ".txt");
- tmpFolder.newFolder("embulk-output-hdfs_directory_" + n);
+ tmpFolder.newFile("embulk-output-hdfs_testDeleteInAdvance_RECURSIVE_file_" + n + ".txt");
+ tmpFolder.newFolder("embulk-output-hdfs_testDeleteInAdvance_RECURSIVE_directory_" + n);
}
List<String> fileListBeforeRun = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath()));
ConfigSource config = getBaseConfigSource()
- .setNested("config", Exec.newConfigSource()
- .set("fs.hdfs.impl", "org.apache.hadoop.fs.RawLocalFileSystem")
- .set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem")
- .set("fs.defaultFS", "file:///"))
+ .setNested("config", getDefaultFsConfig())
.set("delete_in_advance", "RECURSIVE");
run(config);
List<String> fileListAfterRun = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath()));
assertNotEquals(fileListBeforeRun, fileListAfterRun);
- assertThat(fileListAfterRun, not(hasItem(containsString("embulk-output-hdfs_directory"))));
+ assertThat(fileListAfterRun, not(hasItem(containsString("embulk-output-hdfs_testDeleteInAdvance_RECURSIVE_directory_"))));
assertThat(fileListAfterRun, not(hasItem(containsString("txt"))));
assertThat(fileListAfterRun, hasItem(containsString(pathPrefix + "001.00.csv")));
assertRecordsInFile(String.format("%s/%s001.00.csv",
tmpFolder.getRoot().getAbsolutePath(),
pathPrefix));
}
@Test
- public void testDeleteFILE_ONLYInAdvance()
+ public void testDeleteInAdvance_FILE_ONLY()
throws IOException
{
for (int n = 0; n <= 10; n++) {
- tmpFolder.newFile("embulk-output-hdfs_file_" + n + ".txt");
- tmpFolder.newFolder("embulk-output-hdfs_directory_" + n);
+ tmpFolder.newFile("embulk-output-hdfs_testDeleteInAdvance_FILE_ONLY_file_" + n + ".txt");
+ tmpFolder.newFolder("embulk-output-hdfs_testDeleteInAdvance_FILE_ONLY_directory_" + n);
}
List<String> fileListBeforeRun = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath()));
ConfigSource config = getBaseConfigSource()
- .setNested("config", Exec.newConfigSource()
- .set("fs.hdfs.impl", "org.apache.hadoop.fs.RawLocalFileSystem")
- .set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem")
- .set("fs.defaultFS", "file:///"))
+ .setNested("config", getDefaultFsConfig())
.set("delete_in_advance", "FILE_ONLY");
run(config);
List<String> fileListAfterRun = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath()));
assertNotEquals(fileListBeforeRun, fileListAfterRun);
assertThat(fileListAfterRun, not(hasItem(containsString("txt"))));
- assertThat(fileListAfterRun, hasItem(containsString("embulk-output-hdfs_directory")));
+ assertThat(fileListAfterRun, hasItem(containsString("embulk-output-hdfs_testDeleteInAdvance_FILE_ONLY_directory_")));
+ assertThat(fileListAfterRun, hasItem(containsString(pathPrefix + "001.00.csv")));
+ assertRecordsInFile(String.format("%s/%s001.00.csv",
+ tmpFolder.getRoot().getAbsolutePath(),
+ pathPrefix));
+ }
+
+ @Test
+ public void testMode_delete_recursive_in_advance()
+ throws IOException
+ {
+ for (int n = 0; n <= 10; n++) {
+ tmpFolder.newFile("embulk-output-hdfs_testMode_delete_recursive_in_advance_file_" + n + ".txt");
+ tmpFolder.newFolder("embulk-output-hdfs_testMode_delete_recursive_in_advance_directory_" + n);
+ }
+
+ List<String> fileListBeforeRun = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath()));
+
+ ConfigSource config = getBaseConfigSource()
+ .setNested("config", getDefaultFsConfig())
+ .set("mode", "delete_recursive_in_advance");
+
+ run(config);
+
+ List<String> fileListAfterRun = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath()));
+ assertNotEquals(fileListBeforeRun, fileListAfterRun);
+ assertThat(fileListAfterRun, not(hasItem(containsString("embulk-output-hdfs_testMode_delete_recursive_in_advance_directory_"))));
+ assertThat(fileListAfterRun, not(hasItem(containsString("txt"))));
+ assertThat(fileListAfterRun, hasItem(containsString(pathPrefix + "001.00.csv")));
+ assertRecordsInFile(String.format("%s/%s001.00.csv",
+ tmpFolder.getRoot().getAbsolutePath(),
+ pathPrefix));
+ }
+
+ @Test
+ public void testMode_delete_files_in_advance()
+ throws IOException
+ {
+ for (int n = 0; n <= 10; n++) {
+ tmpFolder.newFile("embulk-output-hdfs_testMode_delete_files_in_advance_file_" + n + ".txt");
+ tmpFolder.newFolder("embulk-output-hdfs_testMode_delete_files_in_advance_directory_" + n);
+ }
+
+ List<String> fileListBeforeRun = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath()));
+
+ ConfigSource config = getBaseConfigSource()
+ .setNested("config", getDefaultFsConfig())
+ .set("mode", "delete_files_in_advance");
+
+ run(config);
+
+ List<String> fileListAfterRun = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath()));
+ assertNotEquals(fileListBeforeRun, fileListAfterRun);
+ assertThat(fileListAfterRun, not(hasItem(containsString("txt"))));
+ assertThat(fileListAfterRun, hasItem(containsString("embulk-output-hdfs_testMode_delete_files_in_advance_directory_")));
+ assertThat(fileListAfterRun, hasItem(containsString(pathPrefix + "001.00.csv")));
+ assertRecordsInFile(String.format("%s/%s001.00.csv",
+ tmpFolder.getRoot().getAbsolutePath(),
+ pathPrefix));
+ }
+
+ @Test
+ public void testMode_abort_if_exist()
+ throws IOException
+ {
+ ConfigSource config = getBaseConfigSource()
+ .setNested("config", getDefaultFsConfig())
+ .set("mode", "abort_if_exist");
+
+ run(config);
+ try {
+ run(config);
+ }
+ catch (Exception e) {
+ Throwable t = e;
+ while (t != null) {
+ t = t.getCause();
+ if (t.getCause() instanceof FileAlreadyExistsException) {
+ Assert.assertTrue(true);
+ return;
+ }
+ }
+ Assert.fail("FileAlreadyExistsException is not cause.");
+ }
+
+ }
+
+ @Test
+ public void testMode_overwrite()
+ throws IOException
+ {
+ ConfigSource config = getBaseConfigSource()
+ .setNested("config", getDefaultFsConfig())
+ .set("mode", "overwrite");
+
+ run(config);
+ run(config);
+ Assert.assertTrue(true);
+ }
+
+ @Test
+ public void testMode_replace()
+ throws IOException
+ {
+ for (int n = 0; n <= 10; n++) {
+ tmpFolder.newFile("embulk-output-hdfs_testMode_delete_recursive_in_advance_file_" + n + ".txt");
+ tmpFolder.newFolder("embulk-output-hdfs_testMode_delete_recursive_in_advance_directory_" + n);
+ }
+
+ List<String> fileListBeforeRun = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath()));
+
+
+ run(getBaseConfigSource()
+ .set("config", getDefaultFsConfig())
+ .set("mode", "replace"));
+
+ List<String> fileListAfterRun = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath()));
+ assertNotEquals(fileListBeforeRun, fileListAfterRun);
+ assertThat(fileListAfterRun, not(hasItem(containsString("embulk-output-hdfs_testMode_delete_recursive_in_advance_directory_"))));
+ assertThat(fileListAfterRun, not(hasItem(containsString("txt"))));
assertThat(fileListAfterRun, hasItem(containsString(pathPrefix + "001.00.csv")));
assertRecordsInFile(String.format("%s/%s001.00.csv",
tmpFolder.getRoot().getAbsolutePath(),
pathPrefix));
}