src/test/java/org/embulk/output/sftp/TestSftpFileOutputPlugin.java in embulk-output-sftp-0.0.7 vs src/test/java/org/embulk/output/sftp/TestSftpFileOutputPlugin.java in embulk-output-sftp-0.0.8
- old
+ new
@@ -2,11 +2,10 @@
import com.google.common.base.Charsets;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.io.Resources;
-import org.apache.commons.vfs2.FileSystemException;
import org.apache.sshd.common.NamedFactory;
import org.apache.sshd.common.file.virtualfs.VirtualFileSystemFactory;
import org.apache.sshd.server.Command;
import org.apache.sshd.server.SshServer;
import org.apache.sshd.server.auth.password.PasswordAuthenticator;
@@ -28,11 +27,10 @@
import org.embulk.spi.Page;
import org.embulk.spi.PageTestUtils;
import org.embulk.spi.Schema;
import org.embulk.spi.TransactionalPageOutput;
import org.embulk.spi.time.Timestamp;
-import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -185,11 +183,11 @@
logger.debug(e.getMessage(), e);
}
return fileNames;
}
- private void run(String configYaml, final Optional<Integer> sleep)
+ private void run(String configYaml)
{
ConfigSource config = getConfigFromYaml(configYaml);
runner.transaction(config, SCHEMA, 1, new Control()
{
@Override
@@ -204,20 +202,14 @@
// true,2,3.0,45,1970-01-01 00:00:00.678000 +0000,{\"k\":\"v\"}
for (Page page : PageTestUtils.buildPage(runtime.getBufferAllocator(), SCHEMA,
true, 2L, 3.0D, "45", Timestamp.ofEpochMilli(678L), newMap(newString("k"), newString("v")),
true, 2L, 3.0D, "45", Timestamp.ofEpochMilli(678L), newMap(newString("k"), newString("v")))) {
pageOutput.add(page);
- if (sleep.isPresent()) {
- Thread.sleep(sleep.get() * 1000);
- }
}
pageOutput.commit();
committed = true;
}
- catch (InterruptedException e) {
- logger.debug(e.getMessage(), e);
- }
finally {
if (!committed) {
pageOutput.abort();
}
pageOutput.close();
@@ -372,11 +364,11 @@
" escape: \"\\\\\"\n" +
" null_string: \"\"\n" +
" default_timezone: 'UTC'";
// runner.transaction -> ...
- run(configYaml, Optional.<Integer>absent());
+ run(configYaml);
List<String> fileList = lsR(Lists.<String>newArrayList(), Paths.get(testFolder.getRoot().getAbsolutePath()));
assertThat(fileList, hasItem(containsString(pathPrefix + "001.00.txt")));
assertRecordsInFile(String.format("%s/%s001.00.txt",
testFolder.getRoot().getAbsolutePath(),
@@ -408,11 +400,11 @@
" escape: \"\\\\\"\n" +
" null_string: \"\"\n" +
" default_timezone: 'UTC'";
// runner.transaction -> ...
- run(configYaml, Optional.<Integer>absent());
+ run(configYaml);
List<String> fileList = lsR(Lists.<String>newArrayList(), Paths.get(testFolder.getRoot().getAbsolutePath()));
assertThat(fileList, hasItem(containsString(pathPrefix + "001.00.txt")));
assertRecordsInFile(String.format("%s/%s001.00.txt",
@@ -456,11 +448,11 @@
" escape: \"\\\\\"\n" +
" null_string: \"\"\n" +
" default_timezone: 'UTC'";
// runner.transaction -> ...
- run(configYaml, Optional.<Integer>absent());
+ run(configYaml);
List<String> fileList = lsR(Lists.<String>newArrayList(), Paths.get(testFolder.getRoot().getAbsolutePath()));
assertThat(fileList, hasItem(containsString(pathPrefix + "001.00.txt")));
assertRecordsInFile(String.format("%s/%s001.00.txt",
@@ -470,45 +462,9 @@
finally {
if (proxyServer != null) {
proxyServer.stop();
}
}
- }
-
- @Test
- public void testTimeout()
- {
- // setting embulk config
- final String pathPrefix = "/test/testUserPassword";
- String configYaml = "" +
- "type: sftp\n" +
- "host: " + HOST + "\n" +
- "port: " + PORT + "\n" +
- "user: " + USERNAME + "\n" +
- "secret_key_file: " + SECRET_KEY_FILE + "\n" +
- "secret_key_passphrase: " + SECRET_KEY_PASSPHRASE + "\n" +
- "path_prefix: " + testFolder.getRoot().getAbsolutePath() + pathPrefix + "\n" +
- "timeout: 1\n" +
- "file_ext: txt\n" +
- "formatter:\n" +
- " type: csv\n" +
- " newline: CRLF\n" +
- " newline_in_field: LF\n" +
- " header_line: true\n" +
- " charset: UTF-8\n" +
- " quote_policy: NONE\n" +
- " quote: \"\\\"\"\n" +
- " escape: \"\\\\\"\n" +
- " null_string: \"\"\n" +
- " default_timezone: 'UTC'";
-
- // exception
- exception.expect(RuntimeException.class);
- exception.expectCause(CoreMatchers.<Throwable>instanceOf(FileSystemException.class));
- exception.expectMessage("Could not connect to SFTP server");
-
- // runner.transaction -> ...
- run(configYaml, Optional.of(60)); // sleep 1 minute while processing
}
@Test
public void testProxyType()
{