src/test/java/org/embulk/output/sftp/TestSftpFileOutputPlugin.java in embulk-output-sftp-0.1.10 vs src/test/java/org/embulk/output/sftp/TestSftpFileOutputPlugin.java in embulk-output-sftp-0.1.11
- old
+ new
@@ -2,10 +2,13 @@
import com.google.common.base.Charsets;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.io.Resources;
+import com.jcraft.jsch.JSchException;
+import org.apache.commons.vfs2.FileObject;
+import org.apache.commons.vfs2.FileSystemException;
import org.apache.sshd.common.NamedFactory;
import org.apache.sshd.common.file.virtualfs.VirtualFileSystemFactory;
import org.apache.sshd.server.Command;
import org.apache.sshd.server.SshServer;
import org.apache.sshd.server.auth.password.PasswordAuthenticator;
@@ -27,41 +30,60 @@
import org.embulk.spi.Page;
import org.embulk.spi.PageTestUtils;
import org.embulk.spi.Schema;
import org.embulk.spi.TransactionalPageOutput;
import org.embulk.spi.time.Timestamp;
+import org.hamcrest.CoreMatchers;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.littleshoot.proxy.HttpProxyServer;
import org.littleshoot.proxy.impl.DefaultHttpProxyServer;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
+import java.io.BufferedOutputStream;
import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.PublicKey;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeoutException;
import static com.google.common.io.Files.readLines;
import static org.embulk.spi.type.Types.BOOLEAN;
import static org.embulk.spi.type.Types.DOUBLE;
import static org.embulk.spi.type.Types.JSON;
import static org.embulk.spi.type.Types.LONG;
import static org.embulk.spi.type.Types.STRING;
import static org.embulk.spi.type.Types.TIMESTAMP;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.hasItem;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.msgpack.value.ValueFactory.newMap;
import static org.msgpack.value.ValueFactory.newString;
public class TestSftpFileOutputPlugin
{
@@ -92,10 +114,12 @@
.add("_c3", STRING)
.add("_c4", TIMESTAMP)
.add("_c5", JSON)
.build();
+ private final String defaultPathPrefix = "/test/testUserPassword";
+
@Before
public void createResources()
throws IOException
{
// setup the plugin
@@ -199,12 +223,12 @@
// Result:
// _c0,_c1,_c2,_c3,_c4,_c5
// true,2,3.0,45,1970-01-01 00:00:00.678000 +0000,{\"k\":\"v\"}
// true,2,3.0,45,1970-01-01 00:00:00.678000 +0000,{\"k\":\"v\"}
for (Page page : PageTestUtils.buildPage(runtime.getBufferAllocator(), SCHEMA,
- true, 2L, 3.0D, "45", Timestamp.ofEpochMilli(678L), newMap(newString("k"), newString("v")),
- true, 2L, 3.0D, "45", Timestamp.ofEpochMilli(678L), newMap(newString("k"), newString("v")))) {
+ true, 2L, 3.0D, "45", Timestamp.ofEpochMilli(678L), newMap(newString("k"), newString("v")),
+ true, 2L, 3.0D, "45", Timestamp.ofEpochMilli(678L), newMap(newString("k"), newString("v")))) {
pageOutput.add(page);
}
pageOutput.commit();
committed = true;
}
@@ -221,11 +245,11 @@
private void assertRecordsInFile(String filePath)
{
try {
List<String> lines = readLines(new File(filePath),
- Charsets.UTF_8);
+ Charsets.UTF_8);
for (int i = 0; i < lines.size(); i++) {
String[] record = lines.get(i).split(",");
if (i == 0) {
for (int j = 0; j <= 4; j++) {
assertEquals("_c" + j, record[j]);
@@ -400,12 +424,12 @@
run(configYaml);
List<String> fileList = lsR(Lists.<String>newArrayList(), Paths.get(testFolder.getRoot().getAbsolutePath()));
assertThat(fileList, hasItem(containsString(pathPrefix + "001.00.txt")));
assertRecordsInFile(String.format("%s/%s001.00.txt",
- testFolder.getRoot().getAbsolutePath(),
- pathPrefix));
+ testFolder.getRoot().getAbsolutePath(),
+ pathPrefix));
}
@Test
public void testUserSecretKeyFileAndPutToRootDirectory()
{
@@ -437,20 +461,20 @@
List<String> fileList = lsR(Lists.<String>newArrayList(), Paths.get(testFolder.getRoot().getAbsolutePath()));
assertThat(fileList, hasItem(containsString(pathPrefix + "001.00.txt")));
assertRecordsInFile(String.format("%s/%s001.00.txt",
- testFolder.getRoot().getAbsolutePath(),
- pathPrefix));
+ testFolder.getRoot().getAbsolutePath(),
+ pathPrefix));
}
@Test
public void testUserSecretKeyFileWithProxy()
{
HttpProxyServer proxyServer = null;
try {
- proxyServer = createProxyServer(PROXY_PORT);
+ proxyServer = createProxyServer(PROXY_PORT);
// setting embulk config
final String pathPrefix = "/test/testUserPassword";
String configYaml = "" +
"type: sftp\n" +
@@ -519,7 +543,352 @@
ProxyTask.ProxyType.fromString("non-existing-type");
}
catch (Exception e) {
assertEquals(ConfigException.class, e.getClass());
}
+ }
+
+ @Test
+ public void testUploadFileWithRetry() throws IOException
+ {
+ SftpFileOutputPlugin.PluginTask task = defaultTask();
+ SftpUtils utils = Mockito.spy(new SftpUtils(task));
+
+ // append throws exception
+ Mockito.doThrow(new IOException("Fake Exception"))
+ .doCallRealMethod()
+ .when(utils)
+ .appendFile(Mockito.any(File.class), Mockito.any(FileObject.class), Mockito.any(BufferedOutputStream.class));
+
+ byte[] expected = randBytes(8);
+ File input = writeBytesToInputFile(expected);
+ utils.uploadFile(input, defaultPathPrefix);
+
+ // assert retry and recover
+ Mockito.verify(utils, Mockito.times(2)).appendFile(Mockito.any(File.class), Mockito.any(FileObject.class), Mockito.any(BufferedOutputStream.class));
+ List<String> fileList = lsR(Lists.<String>newArrayList(), Paths.get(testFolder.getRoot().getAbsolutePath()));
+ assertThat(fileList, hasItem(containsString(defaultPathPrefix)));
+
+ // assert uploaded file
+ String filePath = testFolder.getRoot().getAbsolutePath() + File.separator + defaultPathPrefix;
+ File output = new File(filePath);
+ InputStream in = new FileInputStream(output);
+ byte[] actual = new byte[8];
+ in.read(actual);
+ in.close();
+ Assert.assertArrayEquals(expected, actual);
+ }
+
+ @Test
+ public void testUploadFileRetryAndGiveUp() throws IOException
+ {
+ SftpFileOutputPlugin.PluginTask task = defaultTask();
+ SftpUtils utils = Mockito.spy(new SftpUtils(task));
+
+ // append throws exception
+ Mockito.doThrow(new IOException("Fake IOException"))
+ .when(utils)
+ .appendFile(Mockito.any(File.class), Mockito.any(FileObject.class), Mockito.any(BufferedOutputStream.class));
+
+ byte[] expected = randBytes(8);
+ File input = writeBytesToInputFile(expected);
+ try {
+ utils.uploadFile(input, defaultPathPrefix);
+ fail("Should not reach here");
+ }
+ catch (Exception e) {
+ assertThat(e, CoreMatchers.<Exception>instanceOf(RuntimeException.class));
+ assertThat(e.getCause(), CoreMatchers.<Throwable>instanceOf(IOException.class));
+ assertEquals(e.getCause().getMessage(), "Fake IOException");
+ // assert used up all retries
+ Mockito.verify(utils, Mockito.times(task.getMaxConnectionRetry() + 1)).appendFile(Mockito.any(File.class), Mockito.any(FileObject.class), Mockito.any(BufferedOutputStream.class));
+ assertEmptyUploadedFile(defaultPathPrefix);
+ }
+ }
+
+ @Test
+ public void testUploadFileNotRetryAuthFail() throws IOException
+ {
+ SftpFileOutputPlugin.PluginTask task = defaultTask();
+ SftpUtils utils = Mockito.spy(new SftpUtils(task));
+
+ // append throws exception
+ Mockito.doThrow(new IOException(new JSchException("USERAUTH fail")))
+ .doCallRealMethod()
+ .when(utils)
+ .appendFile(Mockito.any(File.class), Mockito.any(FileObject.class), Mockito.any(BufferedOutputStream.class));
+
+ byte[] expected = randBytes(8);
+ File input = writeBytesToInputFile(expected);
+ try {
+ utils.uploadFile(input, defaultPathPrefix);
+ fail("Should not reach here");
+ }
+ catch (Exception e) {
+ assertThat(e, CoreMatchers.<Exception>instanceOf(RuntimeException.class));
+ assertThat(e.getCause(), CoreMatchers.<Throwable>instanceOf(IOException.class));
+ assertThat(e.getCause().getCause(), CoreMatchers.<Throwable>instanceOf(JSchException.class));
+ assertEquals(e.getCause().getCause().getMessage(), "USERAUTH fail");
+ // assert no retry
+ Mockito.verify(utils, Mockito.times(1)).appendFile(Mockito.any(File.class), Mockito.any(FileObject.class), Mockito.any(BufferedOutputStream.class));
+ assertEmptyUploadedFile(defaultPathPrefix);
+ }
+ }
+
+ @Test
+ public void testAppendFile() throws IOException
+ {
+ SftpFileOutputPlugin.PluginTask task = defaultTask();
+ SftpUtils utils = new SftpUtils(task);
+
+ FileObject remoteFile = utils.resolve(defaultPathPrefix);
+ BufferedOutputStream remoteOutput = utils.openStream(remoteFile);
+ // 1st append
+ byte[] expected = randBytes(16);
+ utils.appendFile(writeBytesToInputFile(Arrays.copyOfRange(expected, 0, 8)), remoteFile, remoteOutput);
+ // 2nd append
+ utils.appendFile(writeBytesToInputFile(Arrays.copyOfRange(expected, 8, 16)), remoteFile, remoteOutput);
+ remoteOutput.close();
+ remoteFile.close();
+
+ // assert uploaded file
+ String filePath = testFolder.getRoot().getAbsolutePath() + File.separator + defaultPathPrefix;
+ File output = new File(filePath);
+ InputStream in = new FileInputStream(output);
+ byte[] actual = new byte[16];
+ in.read(actual);
+ in.close();
+ assertArrayEquals(expected, actual);
+ }
+
+ @Test
+ public void testAppendFileAndTimeOut() throws IOException
+ {
+ SftpFileOutputPlugin.PluginTask task = defaultTask();
+ SftpUtils utils = new SftpUtils(task);
+ utils.writeTimeout = 1; // 1s time-out
+
+ byte[] expected = randBytes(8);
+ FileObject remoteFile = utils.resolve(defaultPathPrefix);
+ BufferedOutputStream remoteOutput = Mockito.spy(utils.openStream(remoteFile));
+
+ Mockito.doAnswer(new Answer()
+ {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable
+ {
+ Thread.sleep(2000); // 2s
+ return null;
+ }
+ }).when(remoteOutput).write(Mockito.any(byte[].class), Mockito.eq(0), Mockito.eq(8));
+
+ try {
+ utils.appendFile(writeBytesToInputFile(expected), remoteFile, remoteOutput);
+ fail("Should not reach here");
+ }
+ catch (IOException e) {
+ assertThat(e.getCause(), CoreMatchers.<Throwable>instanceOf(TimeoutException.class));
+ assertNull(e.getCause().getMessage());
+ }
+ }
+
+ @Test
+ public void testRenameFileWithRetry() throws IOException
+ {
+ SftpFileOutputPlugin.PluginTask task = defaultTask();
+ SftpUtils utils = Mockito.spy(new SftpUtils(task));
+
+ byte[] expected = randBytes(8);
+ File input = writeBytesToInputFile(expected);
+ utils.uploadFile(input, defaultPathPrefix);
+
+ Mockito.doThrow(new RuntimeException("Fake Exception"))
+ .doCallRealMethod()
+ .when(utils).resolve(Mockito.eq(defaultPathPrefix));
+
+ utils.renameFile(defaultPathPrefix, "/after");
+ Mockito.verify(utils, Mockito.times(1 + 2)).resolve(Mockito.any(String.class)); // 1 fail + 2 success
+
+ // assert renamed file
+ String filePath = testFolder.getRoot().getAbsolutePath() + "/after";
+ File output = new File(filePath);
+ InputStream in = new FileInputStream(output);
+ byte[] actual = new byte[8];
+ in.read(actual);
+ in.close();
+ Assert.assertArrayEquals(expected, actual);
+ }
+
+ @Test
+ public void testDeleteFile() throws IOException
+ {
+ SftpFileOutputPlugin.PluginTask task = defaultTask();
+ SftpUtils utils = new SftpUtils(task);
+
+ // upload file
+ byte[] expected = randBytes(8);
+ File input = writeBytesToInputFile(expected);
+ utils.uploadFile(input, defaultPathPrefix);
+
+ FileObject target = utils.resolve(defaultPathPrefix);
+ assertTrue("File should exists", target.exists());
+
+ utils.deleteFile(defaultPathPrefix);
+
+ target = utils.resolve(defaultPathPrefix);
+ assertFalse("File should be deleted", target.exists());
+ }
+
+ @Test
+ public void testDeleteFileNotExists()
+ {
+ SftpFileOutputPlugin.PluginTask task = defaultTask();
+ SftpUtils utils = new SftpUtils(task);
+ utils.deleteFile("/not/exists");
+ }
+
+ @Test
+ public void testResolveWithoutRetry()
+ {
+ SftpFileOutputPlugin.PluginTask task = defaultTask();
+ SftpUtils utils = Mockito.spy(new SftpUtils(task));
+
+ Mockito.doThrow(new ConfigException("Fake ConfigException"))
+ .doCallRealMethod()
+ .when(utils).getSftpFileUri(Mockito.eq(defaultPathPrefix));
+
+ try {
+ utils.resolve(defaultPathPrefix);
+ fail("Should not reach here");
+ }
+ catch (Exception e) {
+ assertThat(e, CoreMatchers.<Exception>instanceOf(ConfigException.class));
+ // assert retry
+ Mockito.verify(utils, Mockito.times(1)).getSftpFileUri(Mockito.eq(defaultPathPrefix));
+ }
+ }
+
+ @Test
+ public void testOpenStreamWithRetry() throws FileSystemException
+ {
+ SftpFileOutputPlugin.PluginTask task = defaultTask();
+ SftpUtils utils = new SftpUtils(task);
+
+ FileObject mock = Mockito.spy(utils.resolve(defaultPathPrefix));
+ Mockito.doThrow(new FileSystemException("Fake FileSystemException"))
+ .doCallRealMethod()
+ .when(mock).getContent();
+
+ OutputStream stream = utils.openStream(mock);
+ assertNotNull(stream);
+ Mockito.verify(mock, Mockito.times(2)).getContent();
+ }
+
+ @Test
+ public void testNewSftpFileExists() throws IOException
+ {
+ SftpFileOutputPlugin.PluginTask task = defaultTask();
+ SftpUtils utils = new SftpUtils(task);
+
+ byte[] expected = randBytes(8);
+ File input = writeBytesToInputFile(expected);
+ utils.uploadFile(input, defaultPathPrefix);
+
+ FileObject file = utils.resolve(defaultPathPrefix);
+ assertTrue("File should exists", file.exists());
+
+ file = utils.newSftpFile(utils.getSftpFileUri(defaultPathPrefix));
+ assertFalse("File should be deleted", file.exists());
+ }
+
+ @Test
+ public void testNewSftpFileParentNotExists() throws FileSystemException
+ {
+ SftpFileOutputPlugin.PluginTask task = defaultTask();
+ SftpUtils utils = new SftpUtils(task);
+
+ String parentPath = defaultPathPrefix.substring(0, defaultPathPrefix.lastIndexOf('/'));
+ FileObject parent = utils.resolve(parentPath);
+ boolean exists = parent.exists();
+ logger.info("Resolving parent path: {}, exists? {}", parentPath, exists);
+ assertFalse("Parent folder should not exist", exists);
+
+ utils.newSftpFile(utils.getSftpFileUri(defaultPathPrefix));
+
+ parent = utils.resolve(parentPath);
+ exists = parent.exists();
+ logger.info("Resolving (again) parent path: {}, exists? {}", parentPath, exists);
+ assertTrue("Parent folder must be created", parent.exists());
+ }
+
+ @Test
+ public void testSftpFileOutputNextFile()
+ {
+ SftpFileOutputPlugin.PluginTask task = defaultTask();
+
+ SftpLocalFileOutput localFileOutput = new SftpLocalFileOutput(task, 1);
+ localFileOutput.nextFile();
+ assertNotNull("Must use local temp file", localFileOutput.getLocalOutput());
+ assertNull("Must not use remote temp file", localFileOutput.getRemoteOutput());
+ localFileOutput.close();
+
+ SftpRemoteFileOutput remoteFileOutput = new SftpRemoteFileOutput(task, 1);
+ remoteFileOutput.nextFile();
+ assertNull("Must not use local temp file", remoteFileOutput.getLocalOutput());
+ assertNotNull("Must use remote temp file", remoteFileOutput.getRemoteOutput());
+ remoteFileOutput.close();
+ }
+
+ private String defaultConfig(final String pathPrefix)
+ {
+ return "type: sftp\n" +
+ "host: " + HOST + "\n" +
+ "port: " + PORT + "\n" +
+ "user: " + USERNAME + "\n" +
+ "password: " + PASSWORD + "\n" +
+ "path_prefix: " + pathPrefix + "\n" +
+ "file_ext: txt\n" +
+ "formatter:\n" +
+ " type: csv\n" +
+ " newline: CRLF\n" +
+ " newline_in_field: LF\n" +
+ " header_line: true\n" +
+ " charset: UTF-8\n" +
+ " quote_policy: NONE\n" +
+ " quote: \"\\\"\"\n" +
+ " escape: \"\\\\\"\n" +
+ " null_string: \"\"\n" +
+ " default_timezone: 'UTC'";
+ }
+
+ private PluginTask defaultTask()
+ {
+ ConfigSource config = getConfigFromYaml(defaultConfig(defaultPathPrefix));
+ return config.loadConfig(SftpFileOutputPlugin.PluginTask.class);
+ }
+
+ private byte[] randBytes(final int len)
+ {
+ byte[] bytes = new byte[len];
+ new Random().nextBytes(bytes);
+ return bytes;
+ }
+
+ private File writeBytesToInputFile(final byte[] expected) throws IOException
+ {
+ File input = File.createTempFile("anything", ".dat");
+ OutputStream out = new BufferedOutputStream(new FileOutputStream(input));
+ out.write(expected);
+ out.close();
+ return input;
+ }
+
+ private void assertEmptyUploadedFile(final String pathPrefix)
+ {
+ List<String> fileList = lsR(Lists.<String>newArrayList(), Paths.get(testFolder.getRoot().getAbsolutePath()));
+ assertThat(fileList, hasItem(containsString(pathPrefix)));
+
+ String filePath = testFolder.getRoot().getAbsolutePath() + File.separator + pathPrefix;
+ File output = new File(filePath);
+ assertEquals(0, output.length());
}
}