src/test/java/org/embulk/output/sftp/TestSftpFileOutputPlugin.java in embulk-output-sftp-0.1.10 vs src/test/java/org/embulk/output/sftp/TestSftpFileOutputPlugin.java in embulk-output-sftp-0.1.11

- old
+ new

@@ -2,10 +2,13 @@ import com.google.common.base.Charsets; import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.io.Resources; +import com.jcraft.jsch.JSchException; +import org.apache.commons.vfs2.FileObject; +import org.apache.commons.vfs2.FileSystemException; import org.apache.sshd.common.NamedFactory; import org.apache.sshd.common.file.virtualfs.VirtualFileSystemFactory; import org.apache.sshd.server.Command; import org.apache.sshd.server.SshServer; import org.apache.sshd.server.auth.password.PasswordAuthenticator; @@ -27,41 +30,60 @@ import org.embulk.spi.Page; import org.embulk.spi.PageTestUtils; import org.embulk.spi.Schema; import org.embulk.spi.TransactionalPageOutput; import org.embulk.spi.time.Timestamp; +import org.hamcrest.CoreMatchers; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.littleshoot.proxy.HttpProxyServer; import org.littleshoot.proxy.impl.DefaultHttpProxyServer; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.slf4j.Logger; +import java.io.BufferedOutputStream; import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.security.PublicKey; +import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeoutException; import static com.google.common.io.Files.readLines; import static org.embulk.spi.type.Types.BOOLEAN; import static org.embulk.spi.type.Types.DOUBLE; import static org.embulk.spi.type.Types.JSON; import static org.embulk.spi.type.Types.LONG; import static org.embulk.spi.type.Types.STRING; import static org.embulk.spi.type.Types.TIMESTAMP; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.hasItem; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.msgpack.value.ValueFactory.newMap; import static org.msgpack.value.ValueFactory.newString; public class TestSftpFileOutputPlugin { @@ -92,10 +114,12 @@ .add("_c3", STRING) .add("_c4", TIMESTAMP) .add("_c5", JSON) .build(); + private final String defaultPathPrefix = "/test/testUserPassword"; + @Before public void createResources() throws IOException { // setup the plugin @@ -199,12 +223,12 @@ // Result: // _c0,_c1,_c2,_c3,_c4,_c5 // true,2,3.0,45,1970-01-01 00:00:00.678000 +0000,{\"k\":\"v\"} // true,2,3.0,45,1970-01-01 00:00:00.678000 +0000,{\"k\":\"v\"} for (Page page : PageTestUtils.buildPage(runtime.getBufferAllocator(), SCHEMA, - true, 2L, 3.0D, "45", Timestamp.ofEpochMilli(678L), newMap(newString("k"), newString("v")), - true, 2L, 3.0D, "45", Timestamp.ofEpochMilli(678L), newMap(newString("k"), newString("v")))) { + true, 2L, 3.0D, "45", Timestamp.ofEpochMilli(678L), newMap(newString("k"), newString("v")), + true, 2L, 3.0D, "45", Timestamp.ofEpochMilli(678L), newMap(newString("k"), newString("v")))) { pageOutput.add(page); } pageOutput.commit(); committed = true; } @@ -221,11 +245,11 @@ private void assertRecordsInFile(String filePath) { try { List<String> lines = readLines(new File(filePath), - Charsets.UTF_8); + Charsets.UTF_8); for (int i = 0; i < lines.size(); i++) { String[] record = lines.get(i).split(","); if (i == 0) { for (int j = 0; j <= 4; j++) { assertEquals("_c" + j, record[j]); @@ -400,12 +424,12 @@ run(configYaml); List<String> fileList = lsR(Lists.<String>newArrayList(), Paths.get(testFolder.getRoot().getAbsolutePath())); assertThat(fileList, hasItem(containsString(pathPrefix + "001.00.txt"))); assertRecordsInFile(String.format("%s/%s001.00.txt", - testFolder.getRoot().getAbsolutePath(), - pathPrefix)); + testFolder.getRoot().getAbsolutePath(), + pathPrefix)); } @Test public void testUserSecretKeyFileAndPutToRootDirectory() { @@ -437,20 +461,20 @@ List<String> fileList = lsR(Lists.<String>newArrayList(), Paths.get(testFolder.getRoot().getAbsolutePath())); assertThat(fileList, hasItem(containsString(pathPrefix + "001.00.txt"))); assertRecordsInFile(String.format("%s/%s001.00.txt", - testFolder.getRoot().getAbsolutePath(), - pathPrefix)); + testFolder.getRoot().getAbsolutePath(), + pathPrefix)); } @Test public void testUserSecretKeyFileWithProxy() { HttpProxyServer proxyServer = null; try { - proxyServer = createProxyServer(PROXY_PORT); + proxyServer = createProxyServer(PROXY_PORT); // setting embulk config final String pathPrefix = "/test/testUserPassword"; String configYaml = "" + "type: sftp\n" + @@ -519,7 +543,352 @@ ProxyTask.ProxyType.fromString("non-existing-type"); } catch (Exception e) { assertEquals(ConfigException.class, e.getClass()); } + } + + @Test + public void testUploadFileWithRetry() throws IOException + { + SftpFileOutputPlugin.PluginTask task = defaultTask(); + SftpUtils utils = Mockito.spy(new SftpUtils(task)); + + // append throws exception + Mockito.doThrow(new IOException("Fake Exception")) + .doCallRealMethod() + .when(utils) + .appendFile(Mockito.any(File.class), Mockito.any(FileObject.class), Mockito.any(BufferedOutputStream.class)); + + byte[] expected = randBytes(8); + File input = writeBytesToInputFile(expected); + utils.uploadFile(input, defaultPathPrefix); + + // assert retry and recover + Mockito.verify(utils, Mockito.times(2)).appendFile(Mockito.any(File.class), Mockito.any(FileObject.class), Mockito.any(BufferedOutputStream.class)); + List<String> fileList = lsR(Lists.<String>newArrayList(), Paths.get(testFolder.getRoot().getAbsolutePath())); + assertThat(fileList, hasItem(containsString(defaultPathPrefix))); + + // assert uploaded file + String filePath = testFolder.getRoot().getAbsolutePath() + File.separator + defaultPathPrefix; + File output = new File(filePath); + InputStream in = new FileInputStream(output); + byte[] actual = new byte[8]; + in.read(actual); + in.close(); + Assert.assertArrayEquals(expected, actual); + } + + @Test + public void testUploadFileRetryAndGiveUp() throws IOException + { + SftpFileOutputPlugin.PluginTask task = defaultTask(); + SftpUtils utils = Mockito.spy(new SftpUtils(task)); + + // append throws exception + Mockito.doThrow(new IOException("Fake IOException")) + .when(utils) + .appendFile(Mockito.any(File.class), Mockito.any(FileObject.class), Mockito.any(BufferedOutputStream.class)); + + byte[] expected = randBytes(8); + File input = writeBytesToInputFile(expected); + try { + utils.uploadFile(input, defaultPathPrefix); + fail("Should not reach here"); + } + catch (Exception e) { + assertThat(e, CoreMatchers.<Exception>instanceOf(RuntimeException.class)); + assertThat(e.getCause(), CoreMatchers.<Throwable>instanceOf(IOException.class)); + assertEquals(e.getCause().getMessage(), "Fake IOException"); + // assert used up all retries + Mockito.verify(utils, Mockito.times(task.getMaxConnectionRetry() + 1)).appendFile(Mockito.any(File.class), Mockito.any(FileObject.class), Mockito.any(BufferedOutputStream.class)); + assertEmptyUploadedFile(defaultPathPrefix); + } + } + + @Test + public void testUploadFileNotRetryAuthFail() throws IOException + { + SftpFileOutputPlugin.PluginTask task = defaultTask(); + SftpUtils utils = Mockito.spy(new SftpUtils(task)); + + // append throws exception + Mockito.doThrow(new IOException(new JSchException("USERAUTH fail"))) + .doCallRealMethod() + .when(utils) + .appendFile(Mockito.any(File.class), Mockito.any(FileObject.class), Mockito.any(BufferedOutputStream.class)); + + byte[] expected = randBytes(8); + File input = writeBytesToInputFile(expected); + try { + utils.uploadFile(input, defaultPathPrefix); + fail("Should not reach here"); + } + catch (Exception e) { + assertThat(e, CoreMatchers.<Exception>instanceOf(RuntimeException.class)); + assertThat(e.getCause(), CoreMatchers.<Throwable>instanceOf(IOException.class)); + assertThat(e.getCause().getCause(), CoreMatchers.<Throwable>instanceOf(JSchException.class)); + assertEquals(e.getCause().getCause().getMessage(), "USERAUTH fail"); + // assert no retry + Mockito.verify(utils, Mockito.times(1)).appendFile(Mockito.any(File.class), Mockito.any(FileObject.class), Mockito.any(BufferedOutputStream.class)); + assertEmptyUploadedFile(defaultPathPrefix); + } + } + + @Test + public void testAppendFile() throws IOException + { + SftpFileOutputPlugin.PluginTask task = defaultTask(); + SftpUtils utils = new SftpUtils(task); + + FileObject remoteFile = utils.resolve(defaultPathPrefix); + BufferedOutputStream remoteOutput = utils.openStream(remoteFile); + // 1st append + byte[] expected = randBytes(16); + utils.appendFile(writeBytesToInputFile(Arrays.copyOfRange(expected, 0, 8)), remoteFile, remoteOutput); + // 2nd append + utils.appendFile(writeBytesToInputFile(Arrays.copyOfRange(expected, 8, 16)), remoteFile, remoteOutput); + remoteOutput.close(); + remoteFile.close(); + + // assert uploaded file + String filePath = testFolder.getRoot().getAbsolutePath() + File.separator + defaultPathPrefix; + File output = new File(filePath); + InputStream in = new FileInputStream(output); + byte[] actual = new byte[16]; + in.read(actual); + in.close(); + assertArrayEquals(expected, actual); + } + + @Test + public void testAppendFileAndTimeOut() throws IOException + { + SftpFileOutputPlugin.PluginTask task = defaultTask(); + SftpUtils utils = new SftpUtils(task); + utils.writeTimeout = 1; // 1s time-out + + byte[] expected = randBytes(8); + FileObject remoteFile = utils.resolve(defaultPathPrefix); + BufferedOutputStream remoteOutput = Mockito.spy(utils.openStream(remoteFile)); + + Mockito.doAnswer(new Answer() + { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable + { + Thread.sleep(2000); // 2s + return null; + } + }).when(remoteOutput).write(Mockito.any(byte[].class), Mockito.eq(0), Mockito.eq(8)); + + try { + utils.appendFile(writeBytesToInputFile(expected), remoteFile, remoteOutput); + fail("Should not reach here"); + } + catch (IOException e) { + assertThat(e.getCause(), CoreMatchers.<Throwable>instanceOf(TimeoutException.class)); + assertNull(e.getCause().getMessage()); + } + } + + @Test + public void testRenameFileWithRetry() throws IOException + { + SftpFileOutputPlugin.PluginTask task = defaultTask(); + SftpUtils utils = Mockito.spy(new SftpUtils(task)); + + byte[] expected = randBytes(8); + File input = writeBytesToInputFile(expected); + utils.uploadFile(input, defaultPathPrefix); + + Mockito.doThrow(new RuntimeException("Fake Exception")) + .doCallRealMethod() + .when(utils).resolve(Mockito.eq(defaultPathPrefix)); + + utils.renameFile(defaultPathPrefix, "/after"); + Mockito.verify(utils, Mockito.times(1 + 2)).resolve(Mockito.any(String.class)); // 1 fail + 2 success + + // assert renamed file + String filePath = testFolder.getRoot().getAbsolutePath() + "/after"; + File output = new File(filePath); + InputStream in = new FileInputStream(output); + byte[] actual = new byte[8]; + in.read(actual); + in.close(); + Assert.assertArrayEquals(expected, actual); + } + + @Test + public void testDeleteFile() throws IOException + { + SftpFileOutputPlugin.PluginTask task = defaultTask(); + SftpUtils utils = new SftpUtils(task); + + // upload file + byte[] expected = randBytes(8); + File input = writeBytesToInputFile(expected); + utils.uploadFile(input, defaultPathPrefix); + + FileObject target = utils.resolve(defaultPathPrefix); + assertTrue("File should exists", target.exists()); + + utils.deleteFile(defaultPathPrefix); + + target = utils.resolve(defaultPathPrefix); + assertFalse("File should be deleted", target.exists()); + } + + @Test + public void testDeleteFileNotExists() + { + SftpFileOutputPlugin.PluginTask task = defaultTask(); + SftpUtils utils = new SftpUtils(task); + utils.deleteFile("/not/exists"); + } + + @Test + public void testResolveWithoutRetry() + { + SftpFileOutputPlugin.PluginTask task = defaultTask(); + SftpUtils utils = Mockito.spy(new SftpUtils(task)); + + Mockito.doThrow(new ConfigException("Fake ConfigException")) + .doCallRealMethod() + .when(utils).getSftpFileUri(Mockito.eq(defaultPathPrefix)); + + try { + utils.resolve(defaultPathPrefix); + fail("Should not reach here"); + } + catch (Exception e) { + assertThat(e, CoreMatchers.<Exception>instanceOf(ConfigException.class)); + // assert retry + Mockito.verify(utils, Mockito.times(1)).getSftpFileUri(Mockito.eq(defaultPathPrefix)); + } + } + + @Test + public void testOpenStreamWithRetry() throws FileSystemException + { + SftpFileOutputPlugin.PluginTask task = defaultTask(); + SftpUtils utils = new SftpUtils(task); + + FileObject mock = Mockito.spy(utils.resolve(defaultPathPrefix)); + Mockito.doThrow(new FileSystemException("Fake FileSystemException")) + .doCallRealMethod() + .when(mock).getContent(); + + OutputStream stream = utils.openStream(mock); + assertNotNull(stream); + Mockito.verify(mock, Mockito.times(2)).getContent(); + } + + @Test + public void testNewSftpFileExists() throws IOException + { + SftpFileOutputPlugin.PluginTask task = defaultTask(); + SftpUtils utils = new SftpUtils(task); + + byte[] expected = randBytes(8); + File input = writeBytesToInputFile(expected); + utils.uploadFile(input, defaultPathPrefix); + + FileObject file = utils.resolve(defaultPathPrefix); + assertTrue("File should exists", file.exists()); + + file = utils.newSftpFile(utils.getSftpFileUri(defaultPathPrefix)); + assertFalse("File should be deleted", file.exists()); + } + + @Test + public void testNewSftpFileParentNotExists() throws FileSystemException + { + SftpFileOutputPlugin.PluginTask task = defaultTask(); + SftpUtils utils = new SftpUtils(task); + + String parentPath = defaultPathPrefix.substring(0, defaultPathPrefix.lastIndexOf('/')); + FileObject parent = utils.resolve(parentPath); + boolean exists = parent.exists(); + logger.info("Resolving parent path: {}, exists? {}", parentPath, exists); + assertFalse("Parent folder should not exist", exists); + + utils.newSftpFile(utils.getSftpFileUri(defaultPathPrefix)); + + parent = utils.resolve(parentPath); + exists = parent.exists(); + logger.info("Resolving (again) parent path: {}, exists? {}", parentPath, exists); + assertTrue("Parent folder must be created", parent.exists()); + } + + @Test + public void testSftpFileOutputNextFile() + { + SftpFileOutputPlugin.PluginTask task = defaultTask(); + + SftpLocalFileOutput localFileOutput = new SftpLocalFileOutput(task, 1); + localFileOutput.nextFile(); + assertNotNull("Must use local temp file", localFileOutput.getLocalOutput()); + assertNull("Must not use remote temp file", localFileOutput.getRemoteOutput()); + localFileOutput.close(); + + SftpRemoteFileOutput remoteFileOutput = new SftpRemoteFileOutput(task, 1); + remoteFileOutput.nextFile(); + assertNull("Must not use local temp file", remoteFileOutput.getLocalOutput()); + assertNotNull("Must use remote temp file", remoteFileOutput.getRemoteOutput()); + remoteFileOutput.close(); + } + + private String defaultConfig(final String pathPrefix) + { + return "type: sftp\n" + + "host: " + HOST + "\n" + + "port: " + PORT + "\n" + + "user: " + USERNAME + "\n" + + "password: " + PASSWORD + "\n" + + "path_prefix: " + pathPrefix + "\n" + + "file_ext: txt\n" + + "formatter:\n" + + " type: csv\n" + + " newline: CRLF\n" + + " newline_in_field: LF\n" + + " header_line: true\n" + + " charset: UTF-8\n" + + " quote_policy: NONE\n" + + " quote: \"\\\"\"\n" + + " escape: \"\\\\\"\n" + + " null_string: \"\"\n" + + " default_timezone: 'UTC'"; + } + + private PluginTask defaultTask() + { + ConfigSource config = getConfigFromYaml(defaultConfig(defaultPathPrefix)); + return config.loadConfig(SftpFileOutputPlugin.PluginTask.class); + } + + private byte[] randBytes(final int len) + { + byte[] bytes = new byte[len]; + new Random().nextBytes(bytes); + return bytes; + } + + private File writeBytesToInputFile(final byte[] expected) throws IOException + { + File input = File.createTempFile("anything", ".dat"); + OutputStream out = new BufferedOutputStream(new FileOutputStream(input)); + out.write(expected); + out.close(); + return input; + } + + private void assertEmptyUploadedFile(final String pathPrefix) + { + List<String> fileList = lsR(Lists.<String>newArrayList(), Paths.get(testFolder.getRoot().getAbsolutePath())); + assertThat(fileList, hasItem(containsString(pathPrefix))); + + String filePath = testFolder.getRoot().getAbsolutePath() + File.separator + pathPrefix; + File output = new File(filePath); + assertEquals(0, output.length()); } }