src/test/java/org/embulk/output/sftp/TestSftpFileOutputPlugin.java in embulk-output-sftp-0.0.4 vs src/test/java/org/embulk/output/sftp/TestSftpFileOutputPlugin.java in embulk-output-sftp-0.0.5

- old
+ new

@@ -1,25 +1,24 @@ package org.embulk.output.sftp; import com.google.common.base.Charsets; import com.google.common.base.Optional; -import com.google.common.base.Splitter; -import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.io.Resources; import org.apache.commons.vfs2.FileSystemException; import org.apache.sshd.common.NamedFactory; import org.apache.sshd.common.file.virtualfs.VirtualFileSystemFactory; import org.apache.sshd.server.Command; import org.apache.sshd.server.SshServer; import org.apache.sshd.server.auth.password.PasswordAuthenticator; import org.apache.sshd.server.auth.pubkey.PublickeyAuthenticator; -import org.apache.sshd.server.command.ScpCommandFactory; import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider; +import org.apache.sshd.server.scp.ScpCommandFactory; import org.apache.sshd.server.session.ServerSession; import org.apache.sshd.server.subsystem.sftp.SftpSubsystemFactory; import org.embulk.EmbulkTestRuntime; +import org.embulk.config.ConfigException; import org.embulk.config.ConfigLoader; import org.embulk.config.ConfigSource; import org.embulk.config.TaskReport; import org.embulk.config.TaskSource; import org.embulk.output.sftp.SftpFileOutputPlugin.PluginTask; @@ -30,41 +29,43 @@ import org.embulk.spi.PageTestUtils; import org.embulk.spi.Schema; import org.embulk.spi.TransactionalPageOutput; import org.embulk.spi.time.Timestamp; import org.hamcrest.CoreMatchers; -import org.hamcrest.Matcher; import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; +import org.littleshoot.proxy.HttpProxyServer; +import org.littleshoot.proxy.impl.DefaultHttpProxyServer; import org.slf4j.Logger; import java.io.File; import java.io.IOException; import java.nio.file.DirectoryStream; -import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.PathMatcher; import java.nio.file.Paths; import java.security.PublicKey; -import java.util.Arrays; import java.util.Collections; import java.util.List; import static com.google.common.io.Files.readLines; -import static org.embulk.spi.type.Types.*; +import static org.embulk.spi.type.Types.BOOLEAN; +import static org.embulk.spi.type.Types.DOUBLE; +import static org.embulk.spi.type.Types.JSON; +import static org.embulk.spi.type.Types.LONG; +import static org.embulk.spi.type.Types.STRING; +import static org.embulk.spi.type.Types.TIMESTAMP; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.hasItem; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.MatcherAssert.*; -import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; -//import static org.hamcrest.Matchers.*; +import static org.junit.Assert.assertThat; +import static org.msgpack.value.ValueFactory.newMap; +import static org.msgpack.value.ValueFactory.newString; public class TestSftpFileOutputPlugin { @Rule public EmbulkTestRuntime runtime = new EmbulkTestRuntime(); @@ -78,46 +79,54 @@ private Logger logger = runtime.getExec().getLogger(TestSftpFileOutputPlugin.class); private FileOutputRunner runner; private SshServer sshServer; private static final String HOST = "127.0.0.1"; private static final int PORT = 20022; + private static final String PROXY_HOST = "127.0.0.1"; + private static final int PROXY_PORT = 8080; private static final String USERNAME = "username"; private static final String PASSWORD = "password"; private static final String SECRET_KEY_FILE = Resources.getResource("id_rsa").getPath(); private static final String SECRET_KEY_PASSPHRASE = "SECRET_KEY_PASSPHRASE"; private static final Schema SCHEMA = new Schema.Builder() .add("_c0", BOOLEAN) .add("_c1", LONG) .add("_c2", DOUBLE) .add("_c3", STRING) .add("_c4", TIMESTAMP) + .add("_c5", JSON) .build(); @Before public void createResources() throws IOException { // setup the plugin SftpFileOutputPlugin sftpFileOutputPlugin = new SftpFileOutputPlugin(); runner = new FileOutputRunner(sftpFileOutputPlugin); + sshServer = createSshServer(HOST, PORT, USERNAME, PASSWORD); + } + + private SshServer createSshServer(String host, int port, final String sshUsername, final String sshPassword) + { // setup a mock sftp server - sshServer = SshServer.setUpDefaultServer(); + SshServer sshServer = SshServer.setUpDefaultServer(); VirtualFileSystemFactory fsFactory = new VirtualFileSystemFactory(); - fsFactory.setUserHomeDir(USERNAME, testFolder.getRoot().getAbsolutePath()); + fsFactory.setUserHomeDir(sshUsername, testFolder.getRoot().toPath()); sshServer.setFileSystemFactory(fsFactory); - sshServer.setHost(HOST); - sshServer.setPort(PORT); + sshServer.setHost(host); + sshServer.setPort(port); sshServer.setSubsystemFactories(Collections.<NamedFactory<Command>>singletonList(new SftpSubsystemFactory())); sshServer.setCommandFactory(new ScpCommandFactory()); sshServer.setKeyPairProvider(new SimpleGeneratorHostKeyProvider()); sshServer.setPasswordAuthenticator(new PasswordAuthenticator() { @Override public boolean authenticate(final String username, final String password, final ServerSession session) { - return USERNAME.contentEquals(username) && PASSWORD.contentEquals(password); + return sshUsername.contentEquals(username) && sshPassword.contentEquals(password); } }); sshServer.setPublickeyAuthenticator(new PublickeyAuthenticator() { @Override @@ -131,14 +140,23 @@ sshServer.start(); } catch (IOException e) { logger.debug(e.getMessage(), e); } + return sshServer; } + private HttpProxyServer createProxyServer(int port) + { + return DefaultHttpProxyServer.bootstrap() + .withPort(port) + .start(); + } + @After - public void cleanup() throws InterruptedException { + public void cleanup() throws InterruptedException + { try { sshServer.stop(true); } catch (Exception e) { logger.debug(e.getMessage(), e); @@ -149,21 +167,23 @@ { ConfigLoader loader = new ConfigLoader(Exec.getModelManager()); return loader.fromYamlString(yaml); } - private List<String> lsR(List<String> fileNames, Path dir) { + private List<String> lsR(List<String> fileNames, Path dir) + { try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir)) { for (Path path : stream) { - if(path.toFile().isDirectory()) { + if (path.toFile().isDirectory()) { lsR(fileNames, path); - } else { + } + else { fileNames.add(path.toAbsolutePath().toString()); } } } - catch(IOException e) { + catch (IOException e) { logger.debug(e.getMessage(), e); } return fileNames; } @@ -177,16 +197,16 @@ { TransactionalPageOutput pageOutput = runner.open(taskSource, SCHEMA, 1); boolean committed = false; try { // Result: - // _c0,_c1,_c2,_c3,_c4 - // true,2,3.0,45,1970-01-01 00:00:00.678000 +0000 - // true,2,3.0,45,1970-01-01 00:00:00.678000 +0000 - for (Page page : PageTestUtils.buildPage(runtime.getBufferAllocator(), SCHEMA, true, 2L, 3.0D, "45", - Timestamp.ofEpochMilli(678L), true, 2L, 3.0D, "45", - Timestamp.ofEpochMilli(678L))) { + // _c0,_c1,_c2,_c3,_c4,_c5 + // true,2,3.0,45,1970-01-01 00:00:00.678000 +0000,{\"k\":\"v\"} + // true,2,3.0,45,1970-01-01 00:00:00.678000 +0000,{\"k\":\"v\"} + for (Page page : PageTestUtils.buildPage(runtime.getBufferAllocator(), SCHEMA, + true, 2L, 3.0D, "45", Timestamp.ofEpochMilli(678L), newMap(newString("k"), newString("v")), + true, 2L, 3.0D, "45", Timestamp.ofEpochMilli(678L), newMap(newString("k"), newString("v")))) { pageOutput.add(page); if (sleep.isPresent()) { Thread.sleep(sleep.get() * 1000); } } @@ -213,21 +233,22 @@ List<String> lines = readLines(new File(filePath), Charsets.UTF_8); for (int i = 0; i < lines.size(); i++) { String[] record = lines.get(i).split(","); if (i == 0) { - for (int j = 0; j <= 4 ; j++) { + for (int j = 0; j <= 4; j++) { assertEquals("_c" + j, record[j]); } } else { // true,2,3.0,45,1970-01-01 00:00:00.678000 +0000 assertEquals("true", record[0]); assertEquals("2", record[1]); assertEquals("3.0", record[2]); assertEquals("45", record[3]); assertEquals("1970-01-01 00:00:00.678000 +0000", record[4]); + assertEquals("{\"k\":\"v\"}", record[5]); } } } catch (IOException e) { logger.debug(e.getMessage(), e); @@ -270,23 +291,65 @@ assertEquals(600, task.getSftpConnectionTimeout()); assertEquals(5, task.getMaxConnectionRetry()); assertEquals(pathPrefix, task.getPathPrefix()); assertEquals("txt", task.getFileNameExtension()); assertEquals("%03d.%02d.", task.getSequenceFormat()); + assertEquals(Optional.absent(), task.getProxy()); } + @Test + public void testConfigValuesIncludingProxy() + { + // setting embulk config + final String pathPrefix = "/test/testUserPassword"; + String configYaml = "" + + "type: sftp\n" + + "host: " + HOST + "\n" + + "user: " + USERNAME + "\n" + + "path_prefix: " + pathPrefix + "\n" + + "file_ext: txt\n" + + "proxy: \n" + + " type: http\n" + + " host: proxy_host\n" + + " port: 80 \n" + + " user: proxy_user\n" + + " password: proxy_pass\n" + + " command: proxy_command\n" + + "formatter:\n" + + " type: csv\n" + + " newline: CRLF\n" + + " newline_in_field: LF\n" + + " header_line: true\n" + + " charset: UTF-8\n" + + " quote_policy: NONE\n" + + " quote: \"\\\"\"\n" + + " escape: \"\\\\\"\n" + + " null_string: \"\"\n" + + " default_timezone: 'UTC'"; + + ConfigSource config = getConfigFromYaml(configYaml); + PluginTask task = config.loadConfig(PluginTask.class); + + ProxyTask proxy = task.getProxy().get(); + assertEquals("proxy_command", proxy.getCommand().get()); + assertEquals("proxy_host", proxy.getHost().get()); + assertEquals("proxy_user", proxy.getUser().get()); + assertEquals("proxy_pass", proxy.getPassword().get()); + assertEquals(80, proxy.getPort()); + assertEquals(ProxyTask.ProxyType.HTTP, proxy.getType()); + } + // Cases // login(all cases needs host + port) // user + password // user + secret_key_file + secret_key_passphrase // put files // user_directory_is_root // not user_directory_is_root // timeout // 0 second - @Test public void testUserPasswordAndPutToUserDirectoryRoot() { // setting embulk config final String pathPrefix = "/test/testUserPassword"; @@ -316,11 +379,10 @@ List<String> fileList = lsR(Lists.<String>newArrayList(), Paths.get(testFolder.getRoot().getAbsolutePath())); assertThat(fileList, hasItem(containsString(pathPrefix + "001.00.txt"))); assertRecordsInFile(String.format("%s/%s001.00.txt", testFolder.getRoot().getAbsolutePath(), pathPrefix)); - } @Test public void testUserSecretKeyFileAndPutToRootDirectory() { @@ -357,10 +419,64 @@ testFolder.getRoot().getAbsolutePath(), pathPrefix)); } @Test + public void testUserSecretKeyFileWithProxy() + { + HttpProxyServer proxyServer = null; + try { + proxyServer = createProxyServer(PROXY_PORT); + + // setting embulk config + final String pathPrefix = "/test/testUserPassword"; + String configYaml = "" + + "type: sftp\n" + + "host: " + HOST + "\n" + + "port: " + PORT + "\n" + + "user: " + USERNAME + "\n" + + "secret_key_file: " + SECRET_KEY_FILE + "\n" + + "secret_key_passphrase: " + SECRET_KEY_PASSPHRASE + "\n" + + "path_prefix: " + testFolder.getRoot().getAbsolutePath() + pathPrefix + "\n" + + "file_ext: txt\n" + + "proxy: \n" + + " type: http\n" + + " host: " + PROXY_HOST + "\n" + + " port: " + PROXY_PORT + " \n" + + " user: " + USERNAME + "\n" + + " password: " + PASSWORD + "\n" + + " command: \n" + + "formatter:\n" + + " type: csv\n" + + " newline: CRLF\n" + + " newline_in_field: LF\n" + + " header_line: true\n" + + " charset: UTF-8\n" + + " quote_policy: NONE\n" + + " quote: \"\\\"\"\n" + + " escape: \"\\\\\"\n" + + " null_string: \"\"\n" + + " default_timezone: 'UTC'"; + + // runner.transaction -> ... + run(configYaml, Optional.<Integer>absent()); + + List<String> fileList = lsR(Lists.<String>newArrayList(), Paths.get(testFolder.getRoot().getAbsolutePath())); + assertThat(fileList, hasItem(containsString(pathPrefix + "001.00.txt"))); + + assertRecordsInFile(String.format("%s/%s001.00.txt", + testFolder.getRoot().getAbsolutePath(), + pathPrefix)); + } + finally { + if (proxyServer != null) { + proxyServer.stop(); + } + } + } + + @Test public void testTimeout() { // setting embulk config final String pathPrefix = "/test/testUserPassword"; String configYaml = "" + @@ -390,7 +506,33 @@ exception.expectCause(CoreMatchers.<Throwable>instanceOf(FileSystemException.class)); exception.expectMessage("Could not connect to SFTP server"); // runner.transaction -> ... run(configYaml, Optional.of(60)); // sleep 1 minute while processing + } + + @Test + public void testProxyType() + { + // test valueOf() + assertEquals("http", ProxyTask.ProxyType.valueOf("HTTP").toString()); + assertEquals("socks", ProxyTask.ProxyType.valueOf("SOCKS").toString()); + assertEquals("stream", ProxyTask.ProxyType.valueOf("STREAM").toString()); + try { + ProxyTask.ProxyType.valueOf("non-existing-type"); + } + catch (Exception e) { + assertEquals(IllegalArgumentException.class, e.getClass()); + } + + // test fromString + assertEquals(ProxyTask.ProxyType.HTTP, ProxyTask.ProxyType.fromString("http")); + assertEquals(ProxyTask.ProxyType.SOCKS, ProxyTask.ProxyType.fromString("socks")); + assertEquals(ProxyTask.ProxyType.STREAM, ProxyTask.ProxyType.fromString("stream")); + try { + ProxyTask.ProxyType.fromString("non-existing-type"); + } + catch (Exception e) { + assertEquals(ConfigException.class, e.getClass()); + } } }