src/test/java/org/embulk/input/TestFtpFileInputPlugin.java in embulk-input-ftp-0.1.6 vs src/test/java/org/embulk/input/TestFtpFileInputPlugin.java in embulk-input-ftp-0.2.0

- old
+ new

@@ -1,5 +1,313 @@ package org.embulk.input; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.embulk.EmbulkTestRuntime; +import org.embulk.config.ConfigDiff; +import org.embulk.config.ConfigSource; +import org.embulk.config.TaskReport; +import org.embulk.config.TaskSource; +import org.embulk.input.FtpFileInputPlugin.PluginTask; +import org.embulk.spi.Exec; +import org.embulk.spi.FileInputPlugin; +import org.embulk.spi.FileInputRunner; +import org.embulk.spi.InputPlugin; +import org.embulk.spi.Schema; +import org.embulk.spi.TestPageBuilderReader; +import org.embulk.spi.TestPageBuilderReader.MockPageOutput; +import org.embulk.spi.util.Pages; +import org.embulk.standards.CsvParserPlugin; +import org.embulk.util.ftp.SSLPlugins; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + public class TestFtpFileInputPlugin { + private static String FTP_TEST_HOST; + private static Integer FTP_TEST_PORT; + private static Integer FTP_TEST_SSL_PORT; + private static String FTP_TEST_USER; + private static String FTP_TEST_PASSWORD; + private static String FTP_TEST_SSL_TRUSTED_CA_CERT_FILE; + private static String FTP_TEST_SSL_TRUSTED_CA_CERT_DATA; + private static String FTP_TEST_DIRECTORY; + private static String FTP_TEST_PATH_PREFIX; + private FileInputRunner runner; + private TestPageBuilderReader.MockPageOutput output; + + /* + * This test case requires environment variables + * FTP_TEST_HOST + * FTP_TEST_USER + * FTP_TEST_PASSWORD + * FTP_TEST_SSL_TRUSTED_CA_CERT_FILE + */ + @BeforeClass + public static void initializeConstant() + { + final Map<String, String> env = System.getenv(); + FTP_TEST_HOST = env.getOrDefault("FTP_TEST_HOST", "localhost"); + FTP_TEST_PORT = Integer.valueOf(env.getOrDefault("FTP_TEST_PORT", "11021")); + FTP_TEST_SSL_PORT = Integer.valueOf(env.getOrDefault("FTP_TEST_SSL_PORT", "990")); + FTP_TEST_USER = env.getOrDefault("FTP_TEST_USER", "scott"); + FTP_TEST_PASSWORD = env.getOrDefault("FTP_TEST_PASSWORD", "tiger"); + FTP_TEST_SSL_TRUSTED_CA_CERT_FILE = env.getOrDefault("FTP_TEST_SSL_TRUSTED_CA_CERT_FILE", "dummy"); + FTP_TEST_SSL_TRUSTED_CA_CERT_DATA = env.getOrDefault("FTP_TEST_SSL_TRUSTED_CA_CERT_DATA", "dummy"); + + FTP_TEST_DIRECTORY = getDirectory(env.getOrDefault("FTP_TEST_DIRECTORY", "/unittest/")); + FTP_TEST_PATH_PREFIX = FTP_TEST_DIRECTORY + "sample_"; + } + + @Rule + public EmbulkTestRuntime runtime = new EmbulkTestRuntime(); + private FtpFileInputPlugin plugin; + + @Before + public void createResources() + { + plugin = new FtpFileInputPlugin(); + runner = new FileInputRunner(runtime.getInstance(FtpFileInputPlugin.class)); + output = new MockPageOutput(); + } + + @Test(expected = RuntimeException.class) // TODO ConfigException should be thrown + public void testTransactionWithInvalidHost() + { + ConfigSource config = config().deepCopy() + .set("host", "non-exists.example.com"); + + runner.transaction(config, new Control()); + } + + @Test + public void testResume() + { + PluginTask task = config().loadConfig(PluginTask.class); + task.setSSLConfig(sslConfig(task)); + task.setFiles(Arrays.asList("in/aa/a")); + ConfigDiff configDiff = plugin.resume(task.dump(), 0, new FileInputPlugin.Control() + { + @Override + public List<TaskReport> run(TaskSource taskSource, int taskCount) + { + return emptyTaskReports(taskCount); + } + }); + assertThat(configDiff.get(String.class, "last_path"), is("in/aa/a")); + } + + @Test + public void testCleanup() + { + PluginTask task = config().loadConfig(PluginTask.class); + plugin.cleanup(task.dump(), 0, Lists.<TaskReport>newArrayList()); // no errors happens + } + + @Test + @SuppressWarnings("unchecked") + public void testListFilesWithNonExistPath() throws Exception + { + ConfigSource config = config().deepCopy() + .set("path_prefix", "non-exists-path"); + PluginTask task = config.loadConfig(PluginTask.class); + plugin.transaction(config, new FileInputPlugin.Control() { + @Override + public List<TaskReport> run(TaskSource taskSource, int taskCount) + { + assertThat(taskCount, is(0)); + return emptyTaskReports(taskCount); + } + }); + + Method method = FtpFileInputPlugin.class.getDeclaredMethod("listFiles", Logger.class, PluginTask.class); + method.setAccessible(true); + Logger logger = Exec.getLogger(FtpFileInputPlugin.class); + List<String> fileList = (List<String>) method.invoke(plugin, logger, task); + assertThat(fileList.size(), is(0)); + } + + @Test + @SuppressWarnings("unchecked") + public void testListFiles() throws Exception + { + List<String> expected = Arrays.asList( + FTP_TEST_PATH_PREFIX + "01.csv", + FTP_TEST_PATH_PREFIX + "02.csv" + ); + + ConfigSource config = config(); + final PluginTask task = config.loadConfig(PluginTask.class); + ConfigDiff configDiff = plugin.transaction(config, new FileInputPlugin.Control() { + @Override + public List<TaskReport> run(TaskSource taskSource, int taskCount) + { + assertThat(taskCount, is(2)); + return emptyTaskReports(taskCount); + } + }); + + Method method = FtpFileInputPlugin.class.getDeclaredMethod("listFiles", Logger.class, PluginTask.class); + method.setAccessible(true); + Logger logger = Exec.getLogger(FtpFileInputPlugin.class); + List<String> fileList = (List<String>) method.invoke(plugin, logger, task); + assertThat(fileList.get(0), is(expected.get(0))); + assertThat(fileList.get(1), is(expected.get(1))); + assertThat(configDiff.get(String.class, "last_path"), is(FTP_TEST_PATH_PREFIX + "02.csv")); + } + + @Test + public void testListFilesByPrefixIncrementalFalse() + { + ConfigSource config = config() + .deepCopy() + .set("incremental", false); + + ConfigDiff configDiff = runner.transaction(config, new Control()); + + assertThat(configDiff.toString(), is("{}")); + } + + @Test + @SuppressWarnings("unchecked") + public void testFtpFileInputByOpen() throws Exception + { + ConfigSource config = config(); + PluginTask task = config().loadConfig(PluginTask.class); + + runner.transaction(config, new Control()); + + Method method = FtpFileInputPlugin.class.getDeclaredMethod("listFiles", Logger.class, PluginTask.class); + method.setAccessible(true); + Logger logger = Exec.getLogger(FtpFileInputPlugin.class); + List<String> fileList = (List<String>) method.invoke(plugin, logger, task); + task.setFiles(fileList); + + assertRecords(config, output); + } + + private static List<TaskReport> emptyTaskReports(int taskCount) + { + ImmutableList.Builder<TaskReport> reports = new ImmutableList.Builder<>(); + for (int i = 0; i < taskCount; i++) { + reports.add(Exec.newTaskReport()); + } + return reports.build(); + } + + private class Control + implements InputPlugin.Control + { + @Override + public List<TaskReport> run(TaskSource taskSource, Schema schema, int taskCount) + { + List<TaskReport> reports = new ArrayList<>(); + for (int i = 0; i < taskCount; i++) { + reports.add(runner.run(taskSource, schema, i, output)); + } + return reports; + } + } + + private ConfigSource config() + { + return Exec.newConfigSource() + .set("host", FTP_TEST_HOST) + .set("port", FTP_TEST_PORT) + .set("user", FTP_TEST_USER) + .set("password", FTP_TEST_PASSWORD) + .set("path_prefix", FTP_TEST_PATH_PREFIX) + .set("last_path", "") + .set("file_ext", ".csv") + .set("max_connection_retry", 3) + .set("ssl", false) + .set("ssl_verify", false) + .set("ssl_verify_hostname", false) + .set("ssl_trusted_ca_cert_data", FTP_TEST_SSL_TRUSTED_CA_CERT_DATA) + .set("parser", parserConfig(schemaConfig())); + } + + private ImmutableMap<String, Object> parserConfig(ImmutableList<Object> schemaConfig) + { + ImmutableMap.Builder<String, Object> builder = new ImmutableMap.Builder<>(); + builder.put("type", "csv"); + builder.put("newline", "CRLF"); + builder.put("delimiter", ","); + builder.put("quote", "\""); + builder.put("escape", "\""); + builder.put("trim_if_not_quoted", false); + builder.put("skip_header_lines", 1); + builder.put("allow_extra_columns", false); + builder.put("allow_optional_columns", false); + builder.put("columns", schemaConfig); + return builder.build(); + } + + private ImmutableList<Object> schemaConfig() + { + ImmutableList.Builder<Object> builder = new ImmutableList.Builder<>(); + builder.add(ImmutableMap.of("name", "id", "type", "long")); + builder.add(ImmutableMap.of("name", "account", "type", "long")); + builder.add(ImmutableMap.of("name", "time", "type", "timestamp", "format", "%Y-%m-%d %H:%M:%S")); + builder.add(ImmutableMap.of("name", "purchase", "type", "timestamp", "format", "%Y%m%d")); + builder.add(ImmutableMap.of("name", "comment", "type", "string")); + return builder.build(); + } + + public SSLPlugins.SSLPluginConfig sslConfig(PluginTask task) + { + return SSLPlugins.configure(task); + } + + private void assertRecords(ConfigSource config, MockPageOutput output) + { + List<Object[]> records = getRecords(config, output); + assertThat(records.size(), is(8)); + { + Object[] record = records.get(0); + assertThat((long) record[0], is(1L)); + assertThat((long) record[1], is(32864L)); + assertThat(record[2].toString(), is("2015-01-27 19:23:49 UTC")); + assertThat(record[3].toString(), is("2015-01-27 00:00:00 UTC")); + assertThat(record[4].toString(), is("embulk")); + } + + { + Object[] record = records.get(1); + assertThat((long) record[0], is(2L)); + assertThat((long) record[1], is(14824L)); + assertThat(record[2].toString(), is("2015-01-27 19:01:23 UTC")); + assertThat(record[3].toString(), is("2015-01-27 00:00:00 UTC")); + assertThat(record[4].toString(), is("embulk jruby")); + } + } + + private List<Object[]> getRecords(ConfigSource config, MockPageOutput output) + { + Schema schema = config.getNested("parser").loadConfig(CsvParserPlugin.PluginTask.class).getSchemaConfig().toSchema(); + return Pages.toObjects(schema, output.pages); + } + + private static String getDirectory(String dir) + { + if (dir != null && !dir.endsWith("/")) { + dir = dir + "/"; + } + if (dir.startsWith("/")) { + dir = dir.replaceFirst("/", ""); + } + return dir; + } }