src/test/java/org/embulk/output/hdfs/TestHdfsFileOutputPlugin.java in embulk-output-hdfs-0.2.2 vs src/test/java/org/embulk/output/hdfs/TestHdfsFileOutputPlugin.java in embulk-output-hdfs-0.2.3
- old
+ new
@@ -1,5 +1,280 @@
package org.embulk.output.hdfs;
+import com.google.common.base.Charsets;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.embulk.EmbulkTestRuntime;
+import org.embulk.config.ConfigException;
+import org.embulk.config.ConfigSource;
+import org.embulk.config.TaskReport;
+import org.embulk.config.TaskSource;
+import org.embulk.spi.Exec;
+import org.embulk.spi.FileOutputRunner;
+import org.embulk.spi.OutputPlugin.Control;
+import org.embulk.spi.Page;
+import org.embulk.spi.PageTestUtils;
+import org.embulk.spi.Schema;
+import org.embulk.spi.TransactionalPageOutput;
+import org.embulk.spi.time.Timestamp;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+
+import static com.google.common.io.Files.readLines;
+import static org.embulk.output.hdfs.HdfsFileOutputPlugin.*;
+import static org.embulk.spi.type.Types.*;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThat;
+import static org.msgpack.value.ValueFactory.newMap;
+import static org.msgpack.value.ValueFactory.newString;
+
public class TestHdfsFileOutputPlugin
{
+ @Rule
+ public EmbulkTestRuntime runtime = new EmbulkTestRuntime();
+
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ @Rule
+ public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ private Logger logger = runtime.getExec().getLogger(TestHdfsFileOutputPlugin.class);
+ private HdfsFileOutputPlugin plugin;
+ private FileOutputRunner runner;
+ private String pathPrefix;
+
+ private static final Schema SCHEMA = new Schema.Builder()
+ .add("_c0", BOOLEAN)
+ .add("_c1", LONG)
+ .add("_c2", DOUBLE)
+ .add("_c3", STRING)
+ .add("_c4", TIMESTAMP)
+ .add("_c5", JSON)
+ .build();
+
+ @Before
+ public void createResources()
+ throws IOException
+ {
+ plugin = new HdfsFileOutputPlugin();
+ runner = new FileOutputRunner(runtime.getInstance(HdfsFileOutputPlugin.class));
+ pathPrefix = tmpFolder.getRoot().getAbsolutePath() + "/embulk-output-hdfs_";
+ }
+
+ private ConfigSource getBaseConfigSource()
+ {
+ return Exec.newConfigSource()
+ .set("type", "hdfs")
+ .set("path_prefix", pathPrefix)
+ .set("file_ext", "csv")
+ .setNested("formatter", Exec.newConfigSource()
+ .set("type", "csv")
+ .set("newline", "CRLF")
+ .set("newline_in_field", "LF")
+ .set("header_line", true)
+ .set("charset", "UTF-8")
+ .set("quote_policy", "NONE")
+ .set("quote", "\"")
+ .set("escape", "\\")
+ .set("null_string", "")
+ .set("default_timezone", "UTC"));
+ }
+
+ @Test
+ public void testDefaultValues()
+ {
+ ConfigSource config = getBaseConfigSource();
+ PluginTask task = config.loadConfig(PluginTask.class);
+ assertEquals(pathPrefix, task.getPathPrefix());
+ assertEquals("csv", task.getFileExt());
+ assertEquals("%03d.%02d.", task.getSequenceFormat());
+ assertEquals(Lists.newArrayList(), task.getConfigFiles());
+ assertEquals(Maps.newHashMap(), task.getConfig());
+ assertEquals(0, task.getRewindSeconds());
+ assertEquals(false, task.getOverwrite());
+ assertEquals(Optional.absent(), task.getDoas());
+ assertEquals(PluginTask.DeleteInAdvancePolicy.NONE, task.getDeleteInAdvance());
+ }
+
+ @Test(expected = ConfigException.class)
+ public void testRequiredValues()
+ {
+ ConfigSource config = Exec.newConfigSource();
+ PluginTask task = config.loadConfig(PluginTask.class);
+ }
+
+ private List<String> lsR(List<String> names, java.nio.file.Path dir)
+ {
+ try (DirectoryStream<java.nio.file.Path> stream = Files.newDirectoryStream(dir)) {
+ for (java.nio.file.Path path : stream) {
+ if (path.toFile().isDirectory()) {
+ logger.debug("[lsR] find a directory: {}", path.toAbsolutePath().toString());
+ names.add(path.toAbsolutePath().toString());
+ lsR(names, path);
+ }
+ else {
+ logger.debug("[lsR] find a file: {}", path.toAbsolutePath().toString());
+ names.add(path.toAbsolutePath().toString());
+ }
+ }
+ }
+ catch (IOException e) {
+ logger.debug(e.getMessage(), e);
+ }
+ return names;
+ }
+
+ private void run(ConfigSource config)
+ {
+ runner.transaction(config, SCHEMA, 1, new Control()
+ {
+ @Override
+ public List<TaskReport> run(TaskSource taskSource)
+ {
+ TransactionalPageOutput pageOutput = runner.open(taskSource, SCHEMA, 1);
+ boolean committed = false;
+ try {
+ // Result:
+ // _c0,_c1,_c2,_c3,_c4,_c5
+ // true,2,3.0,45,1970-01-01 00:00:00.678000 +0000,{\"k\":\"v\"}
+ // true,2,3.0,45,1970-01-01 00:00:00.678000 +0000,{\"k\":\"v\"}
+ for (Page page : PageTestUtils.buildPage(runtime.getBufferAllocator(), SCHEMA,
+ true, 2L, 3.0D, "45", Timestamp.ofEpochMilli(678L), newMap(newString("k"), newString("v")),
+ true, 2L, 3.0D, "45", Timestamp.ofEpochMilli(678L), newMap(newString("k"), newString("v")))) {
+ pageOutput.add(page);
+ }
+ pageOutput.commit();
+ committed = true;
+ }
+ finally {
+ if (!committed) {
+ pageOutput.abort();
+ }
+ pageOutput.close();
+ }
+ return Lists.newArrayList();
+ }
+ });
+ }
+
+ private void assertRecordsInFile(String filePath)
+ {
+ try {
+ List<String> lines = readLines(new File(filePath),
+ Charsets.UTF_8);
+ for (int i = 0; i < lines.size(); i++) {
+ String[] record = lines.get(i).split(",");
+ if (i == 0) {
+ for (int j = 0; j <= 4; j++) {
+ assertEquals("_c" + j, record[j]);
+ }
+ }
+ else {
+ // true,2,3.0,45,1970-01-01 00:00:00.678000 +0000
+ assertEquals("true", record[0]);
+ assertEquals("2", record[1]);
+ assertEquals("3.0", record[2]);
+ assertEquals("45", record[3]);
+ assertEquals("1970-01-01 00:00:00.678000 +0000", record[4]);
+ assertEquals("{\"k\":\"v\"}", record[5]);
+ }
+ }
+ }
+ catch (IOException e) {
+ logger.debug(e.getMessage(), e);
+ }
+ }
+
+ @Test
+ public void testBulkLoad()
+ {
+ ConfigSource config = getBaseConfigSource()
+ .setNested("config", Exec.newConfigSource()
+ .set("fs.hdfs.impl", "org.apache.hadoop.fs.RawLocalFileSystem")
+ .set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem")
+ .set("fs.defaultFS", "file:///"));
+
+ run(config);
+ List<String> fileList = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath()));
+ assertThat(fileList, hasItem(containsString(pathPrefix + "001.00.csv")));
+ assertRecordsInFile(String.format("%s/%s001.00.csv",
+ tmpFolder.getRoot().getAbsolutePath(),
+ pathPrefix));
+ }
+
+ @Test
+ public void testDeleteRECURSIVEInAdvance()
+ throws IOException
+ {
+ for (int n = 0; n <= 10; n++) {
+ tmpFolder.newFile("embulk-output-hdfs_file_" + n + ".txt");
+ tmpFolder.newFolder("embulk-output-hdfs_directory_" + n);
+ }
+
+ List<String> fileListBeforeRun = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath()));
+
+ ConfigSource config = getBaseConfigSource()
+ .setNested("config", Exec.newConfigSource()
+ .set("fs.hdfs.impl", "org.apache.hadoop.fs.RawLocalFileSystem")
+ .set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem")
+ .set("fs.defaultFS", "file:///"))
+ .set("delete_in_advance", "RECURSIVE");
+
+ run(config);
+
+ List<String> fileListAfterRun = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath()));
+ assertNotEquals(fileListBeforeRun, fileListAfterRun);
+ assertThat(fileListAfterRun, not(hasItem(containsString("embulk-output-hdfs_directory"))));
+ assertThat(fileListAfterRun, not(hasItem(containsString("txt"))));
+ assertThat(fileListAfterRun, hasItem(containsString(pathPrefix + "001.00.csv")));
+ assertRecordsInFile(String.format("%s/%s001.00.csv",
+ tmpFolder.getRoot().getAbsolutePath(),
+ pathPrefix));
+ }
+
+ @Test
+ public void testDeleteFILE_ONLYInAdvance()
+ throws IOException
+ {
+ for (int n = 0; n <= 10; n++) {
+ tmpFolder.newFile("embulk-output-hdfs_file_" + n + ".txt");
+ tmpFolder.newFolder("embulk-output-hdfs_directory_" + n);
+ }
+
+ List<String> fileListBeforeRun = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath()));
+
+ ConfigSource config = getBaseConfigSource()
+ .setNested("config", Exec.newConfigSource()
+ .set("fs.hdfs.impl", "org.apache.hadoop.fs.RawLocalFileSystem")
+ .set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem")
+ .set("fs.defaultFS", "file:///"))
+ .set("delete_in_advance", "FILE_ONLY");
+
+ run(config);
+
+ List<String> fileListAfterRun = lsR(Lists.<String>newArrayList(), Paths.get(tmpFolder.getRoot().getAbsolutePath()));
+ assertNotEquals(fileListBeforeRun, fileListAfterRun);
+ assertThat(fileListAfterRun, not(hasItem(containsString("txt"))));
+ assertThat(fileListAfterRun, hasItem(containsString("embulk-output-hdfs_directory")));
+ assertThat(fileListAfterRun, hasItem(containsString(pathPrefix + "001.00.csv")));
+ assertRecordsInFile(String.format("%s/%s001.00.csv",
+ tmpFolder.getRoot().getAbsolutePath(),
+ pathPrefix));
+ }
}