src/test/java/org/embulk/output/sftp/TestSftpFileOutputPlugin.java in embulk-output-sftp-0.0.2 vs src/test/java/org/embulk/output/sftp/TestSftpFileOutputPlugin.java in embulk-output-sftp-0.0.3
- old
+ new
@@ -1,5 +1,396 @@
package org.embulk.output.sftp;
+import com.google.common.base.Charsets;
+import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.io.Resources;
+import org.apache.commons.vfs2.FileSystemException;
+import org.apache.sshd.common.NamedFactory;
+import org.apache.sshd.common.file.virtualfs.VirtualFileSystemFactory;
+import org.apache.sshd.server.Command;
+import org.apache.sshd.server.SshServer;
+import org.apache.sshd.server.auth.password.PasswordAuthenticator;
+import org.apache.sshd.server.auth.pubkey.PublickeyAuthenticator;
+import org.apache.sshd.server.command.ScpCommandFactory;
+import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
+import org.apache.sshd.server.session.ServerSession;
+import org.apache.sshd.server.subsystem.sftp.SftpSubsystemFactory;
+import org.embulk.EmbulkTestRuntime;
+import org.embulk.config.ConfigLoader;
+import org.embulk.config.ConfigSource;
+import org.embulk.config.TaskReport;
+import org.embulk.config.TaskSource;
+import org.embulk.output.sftp.SftpFileOutputPlugin.PluginTask;
+import org.embulk.spi.Exec;
+import org.embulk.spi.FileOutputRunner;
+import org.embulk.spi.OutputPlugin.Control;
+import org.embulk.spi.Page;
+import org.embulk.spi.PageTestUtils;
+import org.embulk.spi.Schema;
+import org.embulk.spi.TransactionalPageOutput;
+import org.embulk.spi.time.Timestamp;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.Matcher;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.PathMatcher;
+import java.nio.file.Paths;
+import java.security.PublicKey;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static com.google.common.io.Files.readLines;
+import static org.embulk.spi.type.Types.*;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.*;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+//import static org.hamcrest.Matchers.*;
+
public class TestSftpFileOutputPlugin
{
+ @Rule
+ public EmbulkTestRuntime runtime = new EmbulkTestRuntime();
+
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ @Rule
+ public TemporaryFolder testFolder = new TemporaryFolder();
+
+ private Logger logger = runtime.getExec().getLogger(TestSftpFileOutputPlugin.class);
+ private FileOutputRunner runner;
+ private SshServer sshServer;
+ private static final String HOST = "127.0.0.1";
+ private static final int PORT = 20022;
+ private static final String USERNAME = "username";
+ private static final String PASSWORD = "password";
+ private static final String SECRET_KEY_FILE = Resources.getResource("id_rsa").getPath();
+ private static final String SECRET_KEY_PASSPHRASE = "SECRET_KEY_PASSPHRASE";
+ private static final Schema SCHEMA = new Schema.Builder()
+ .add("_c0", BOOLEAN)
+ .add("_c1", LONG)
+ .add("_c2", DOUBLE)
+ .add("_c3", STRING)
+ .add("_c4", TIMESTAMP)
+ .build();
+
+ @Before
+ public void createResources()
+ throws IOException
+ {
+ // setup the plugin
+ SftpFileOutputPlugin sftpFileOutputPlugin = new SftpFileOutputPlugin();
+ runner = new FileOutputRunner(sftpFileOutputPlugin);
+
+ // setup a mock sftp server
+ sshServer = SshServer.setUpDefaultServer();
+ VirtualFileSystemFactory fsFactory = new VirtualFileSystemFactory();
+ fsFactory.setUserHomeDir(USERNAME, testFolder.getRoot().getAbsolutePath());
+ sshServer.setFileSystemFactory(fsFactory);
+ sshServer.setHost(HOST);
+ sshServer.setPort(PORT);
+ sshServer.setSubsystemFactories(Collections.<NamedFactory<Command>>singletonList(new SftpSubsystemFactory()));
+ sshServer.setCommandFactory(new ScpCommandFactory());
+ sshServer.setKeyPairProvider(new SimpleGeneratorHostKeyProvider());
+ sshServer.setPasswordAuthenticator(new PasswordAuthenticator()
+ {
+ @Override
+ public boolean authenticate(final String username, final String password, final ServerSession session)
+ {
+ return USERNAME.contentEquals(username) && PASSWORD.contentEquals(password);
+ }
+ });
+ sshServer.setPublickeyAuthenticator(new PublickeyAuthenticator()
+ {
+ @Override
+ public boolean authenticate(String username, PublicKey key, ServerSession session)
+ {
+ return true;
+ }
+ });
+
+ try {
+ sshServer.start();
+ }
+ catch (IOException e) {
+ logger.debug(e.getMessage(), e);
+ }
+ }
+
+ @After
+ public void cleanup() throws InterruptedException {
+ try {
+ sshServer.stop(true);
+ }
+ catch (Exception e) {
+ logger.debug(e.getMessage(), e);
+ }
+ }
+
+ private ConfigSource getConfigFromYaml(String yaml)
+ {
+ ConfigLoader loader = new ConfigLoader(Exec.getModelManager());
+ return loader.fromYamlString(yaml);
+ }
+
+ private List<String> lsR(List<String> fileNames, Path dir) {
+ try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir)) {
+ for (Path path : stream) {
+ if(path.toFile().isDirectory()) {
+ lsR(fileNames, path);
+ } else {
+ fileNames.add(path.toAbsolutePath().toString());
+ }
+ }
+ }
+ catch(IOException e) {
+ logger.debug(e.getMessage(), e);
+ }
+ return fileNames;
+ }
+
+ private void run(String configYaml, final Optional<Integer> sleep)
+ {
+ ConfigSource config = getConfigFromYaml(configYaml);
+ runner.transaction(config, SCHEMA, 1, new Control()
+ {
+ @Override
+ public List<TaskReport> run(TaskSource taskSource)
+ {
+ TransactionalPageOutput pageOutput = runner.open(taskSource, SCHEMA, 1);
+ boolean committed = false;
+ try {
+ // Result:
+ // _c0,_c1,_c2,_c3,_c4
+ // true,2,3.0,45,1970-01-01 00:00:00.678000 +0000
+ // true,2,3.0,45,1970-01-01 00:00:00.678000 +0000
+ for (Page page : PageTestUtils.buildPage(runtime.getBufferAllocator(), SCHEMA, true, 2L, 3.0D, "45",
+ Timestamp.ofEpochMilli(678L), true, 2L, 3.0D, "45",
+ Timestamp.ofEpochMilli(678L))) {
+ pageOutput.add(page);
+ if (sleep.isPresent()) {
+ Thread.sleep(sleep.get() * 1000);
+ }
+ }
+ pageOutput.commit();
+ committed = true;
+ }
+ catch (InterruptedException e) {
+ logger.debug(e.getMessage(), e);
+ }
+ finally {
+ if (!committed) {
+ pageOutput.abort();
+ }
+ pageOutput.close();
+ }
+ return Lists.newArrayList();
+ }
+ });
+ }
+
+ private void assertRecordsInFile(String filePath)
+ {
+ try {
+ List<String> lines = readLines(new File(filePath),
+ Charsets.UTF_8);
+ for (int i = 0; i < lines.size(); i++) {
+ String[] record = lines.get(i).split(",");
+ if (i == 0) {
+ for (int j = 0; j <= 4 ; j++) {
+ assertEquals("_c" + j, record[j]);
+ }
+ }
+ else {
+ // true,2,3.0,45,1970-01-01 00:00:00.678000 +0000
+ assertEquals("true", record[0]);
+ assertEquals("2", record[1]);
+ assertEquals("3.0", record[2]);
+ assertEquals("45", record[3]);
+ assertEquals("1970-01-01 00:00:00.678000 +0000", record[4]);
+ }
+ }
+ }
+ catch (IOException e) {
+ logger.debug(e.getMessage(), e);
+ }
+ }
+
+ @Test
+ public void testConfigValuesIncludingDefault()
+ {
+ // setting embulk config
+ final String pathPrefix = "/test/testUserPassword";
+ String configYaml = "" +
+ "type: sftp\n" +
+ "host: " + HOST + "\n" +
+ "user: " + USERNAME + "\n" +
+ "path_prefix: " + pathPrefix + "\n" +
+ "file_ext: txt\n" +
+ "formatter:\n" +
+ " type: csv\n" +
+ " newline: CRLF\n" +
+ " newline_in_field: LF\n" +
+ " header_line: true\n" +
+ " charset: UTF-8\n" +
+ " quote_policy: NONE\n" +
+ " quote: \"\\\"\"\n" +
+ " escape: \"\\\\\"\n" +
+ " null_string: \"\"\n" +
+ " default_timezone: 'UTC'";
+
+ ConfigSource config = getConfigFromYaml(configYaml);
+ PluginTask task = config.loadConfig(PluginTask.class);
+
+ assertEquals(HOST, task.getHost());
+ assertEquals(22, task.getPort());
+ assertEquals(USERNAME, task.getUser());
+ assertEquals(Optional.absent(), task.getPassword());
+ assertEquals(Optional.absent(), task.getSecretKeyFilePath());
+ assertEquals("", task.getSecretKeyPassphrase());
+ assertEquals(true, task.getUserDirIsRoot());
+ assertEquals(600, task.getSftpConnectionTimeout());
+ assertEquals(5, task.getMaxConnectionRetry());
+ assertEquals(pathPrefix, task.getPathPrefix());
+ assertEquals("txt", task.getFileNameExtension());
+ assertEquals("%03d.%02d.", task.getSequenceFormat());
+ }
+
+ // Cases
+ // login(all cases needs host + port)
+ // user + password
+ // user + secret_key_file + secret_key_passphrase
+ // put files
+ // user_directory_is_root
+ // not user_directory_is_root
+ // timeout
+ // 0 second
+
+
+ @Test
+ public void testUserPasswordAndPutToUserDirectoryRoot()
+ {
+ // setting embulk config
+ final String pathPrefix = "/test/testUserPassword";
+ String configYaml = "" +
+ "type: sftp\n" +
+ "host: " + HOST + "\n" +
+ "port: " + PORT + "\n" +
+ "user: " + USERNAME + "\n" +
+ "password: " + PASSWORD + "\n" +
+ "path_prefix: " + pathPrefix + "\n" +
+ "file_ext: txt\n" +
+ "formatter:\n" +
+ " type: csv\n" +
+ " newline: CRLF\n" +
+ " newline_in_field: LF\n" +
+ " header_line: true\n" +
+ " charset: UTF-8\n" +
+ " quote_policy: NONE\n" +
+ " quote: \"\\\"\"\n" +
+ " escape: \"\\\\\"\n" +
+ " null_string: \"\"\n" +
+ " default_timezone: 'UTC'";
+
+ // runner.transaction -> ...
+ run(configYaml, Optional.<Integer>absent());
+
+ List<String> fileList = lsR(Lists.<String>newArrayList(), Paths.get(testFolder.getRoot().getAbsolutePath()));
+ assertThat(fileList, hasItem(containsString(pathPrefix + "001.00.txt")));
+ assertRecordsInFile(String.format("%s/%s001.00.txt",
+ testFolder.getRoot().getAbsolutePath(),
+ pathPrefix));
+
+ }
+
+ @Test
+ public void testUserSecretKeyFileAndPutToRootDirectory()
+ {
+ // setting embulk config
+ final String pathPrefix = "/test/testUserPassword";
+ String configYaml = "" +
+ "type: sftp\n" +
+ "host: " + HOST + "\n" +
+ "port: " + PORT + "\n" +
+ "user: " + USERNAME + "\n" +
+ "secret_key_file: " + SECRET_KEY_FILE + "\n" +
+ "secret_key_passphrase: " + SECRET_KEY_PASSPHRASE + "\n" +
+ "path_prefix: " + testFolder.getRoot().getAbsolutePath() + pathPrefix + "\n" +
+ "file_ext: txt\n" +
+ "formatter:\n" +
+ " type: csv\n" +
+ " newline: CRLF\n" +
+ " newline_in_field: LF\n" +
+ " header_line: true\n" +
+ " charset: UTF-8\n" +
+ " quote_policy: NONE\n" +
+ " quote: \"\\\"\"\n" +
+ " escape: \"\\\\\"\n" +
+ " null_string: \"\"\n" +
+ " default_timezone: 'UTC'";
+
+ // runner.transaction -> ...
+ run(configYaml, Optional.<Integer>absent());
+
+ List<String> fileList = lsR(Lists.<String>newArrayList(), Paths.get(testFolder.getRoot().getAbsolutePath()));
+ assertThat(fileList, hasItem(containsString(pathPrefix + "001.00.txt")));
+
+ assertRecordsInFile(String.format("%s/%s001.00.txt",
+ testFolder.getRoot().getAbsolutePath(),
+ pathPrefix));
+ }
+
+ @Test
+ public void testTimeout()
+ {
+ // setting embulk config
+ final String pathPrefix = "/test/testUserPassword";
+ String configYaml = "" +
+ "type: sftp\n" +
+ "host: " + HOST + "\n" +
+ "port: " + PORT + "\n" +
+ "user: " + USERNAME + "\n" +
+ "secret_key_file: " + SECRET_KEY_FILE + "\n" +
+ "secret_key_passphrase: " + SECRET_KEY_PASSPHRASE + "\n" +
+ "path_prefix: " + testFolder.getRoot().getAbsolutePath() + pathPrefix + "\n" +
+ "timeout: 1\n" +
+ "file_ext: txt\n" +
+ "formatter:\n" +
+ " type: csv\n" +
+ " newline: CRLF\n" +
+ " newline_in_field: LF\n" +
+ " header_line: true\n" +
+ " charset: UTF-8\n" +
+ " quote_policy: NONE\n" +
+ " quote: \"\\\"\"\n" +
+ " escape: \"\\\\\"\n" +
+ " null_string: \"\"\n" +
+ " default_timezone: 'UTC'";
+
+ // exception
+ exception.expect(RuntimeException.class);
+ exception.expectCause(CoreMatchers.<Throwable>instanceOf(FileSystemException.class));
+ exception.expectMessage("Could not connect to SFTP server");
+
+ // runner.transaction -> ...
+ run(configYaml, Optional.of(60)); // sleep 1 minute while processing
+ }
}