Sha256: 668a090b14fe13af92519b7055e2c6baffbd69f5868762578f78cce868a47858
Contents?: true
Size: 1.7 KB
Versions: 1
Compression:
Stored size: 1.7 KB
Contents
package org.embulk.input.sftp; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigSource; import org.embulk.config.TaskReport; import org.embulk.config.TaskSource; import org.embulk.spi.Exec; import org.embulk.spi.FileInputPlugin; import org.embulk.spi.TransactionalFileInput; import java.util.List; public class SftpFileInputPlugin implements FileInputPlugin { @Override public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control) { PluginTask task = config.loadConfig(PluginTask.class); // list files recursively task.setFiles(SftpFileInput.listFilesByPrefix(task)); // number of processors is same with number of files return resume(task.dump(), task.getFiles().getTaskCount(), control); } @Override public ConfigDiff resume(TaskSource taskSource, int taskCount, FileInputPlugin.Control control) { PluginTask task = taskSource.loadTask(PluginTask.class); control.run(taskSource, taskCount); ConfigDiff configDiff = Exec.newConfigDiff(); if (task.getIncremental()) { configDiff.set("last_path", SftpFileInput.getRelativePath(task.getFiles().getLastPath(task.getLastPath()))); } return configDiff; } @Override public void cleanup(TaskSource taskSource, int taskCount, List<TaskReport> successTaskReports) { } @Override public TransactionalFileInput open(TaskSource taskSource, int taskIndex) { PluginTask task = taskSource.loadTask(PluginTask.class); return new SftpFileInput(task, taskIndex); } }
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
embulk-input-sftp-0.2.0 | src/main/java/org/embulk/input/sftp/SftpFileInputPlugin.java |