Sha256: 6353366aa3eff994108c0682ebd43cf38e7a0962059c39d7ecca0e840f46a6a2
Contents?: true
Size: 1.84 KB
Versions: 9
Compression:
Stored size: 1.84 KB
Contents
package org.embulk.input.sftp; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigSource; import org.embulk.config.TaskReport; import org.embulk.config.TaskSource; import org.embulk.spi.Exec; import org.embulk.spi.FileInputPlugin; import org.embulk.spi.TransactionalFileInput; import java.util.List; public class SftpFileInputPlugin implements FileInputPlugin { @Override public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control) { PluginTask task = config.loadConfig(PluginTask.class); // list files recursively task.setFiles(SftpFileInput.listFilesByPrefix(task)); // number of processors is same with number of files return resume(task.dump(), task.getFiles().getTaskCount(), control); } @Override public ConfigDiff resume(TaskSource taskSource, int taskCount, FileInputPlugin.Control control) { PluginTask task = taskSource.loadTask(PluginTask.class); String lastPath = null; if (task.getIncremental()) { lastPath = SftpFileInput.getRelativePath(task, task.getFiles().getLastPath(task.getLastPath())); } control.run(taskSource, taskCount); ConfigDiff configDiff = Exec.newConfigDiff(); if (task.getIncremental() && lastPath != null) { configDiff.set("last_path", lastPath); } return configDiff; } @Override public void cleanup(TaskSource taskSource, int taskCount, List<TaskReport> successTaskReports) { } @Override public TransactionalFileInput open(TaskSource taskSource, int taskIndex) { PluginTask task = taskSource.loadTask(PluginTask.class); return new SftpFileInput(task, taskIndex); } }
Version data entries
9 entries across 9 versions & 1 rubygems