package org.embulk.output.sftp; import com.google.common.base.Optional; import org.embulk.config.Config; import org.embulk.config.ConfigDefault; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigSource; import org.embulk.config.Task; import org.embulk.config.TaskReport; import org.embulk.config.TaskSource; import org.embulk.spi.Exec; import org.embulk.spi.FileOutputPlugin; import org.embulk.spi.TransactionalFileOutput; import org.slf4j.Logger; import java.util.List; public class SftpFileOutputPlugin implements FileOutputPlugin { private Logger logger = Exec.getLogger(SftpFileOutputPlugin.class); public interface PluginTask extends Task { @Config("host") public String getHost(); @Config("port") @ConfigDefault("22") public int getPort(); @Config("user") public String getUser(); @Config("password") @ConfigDefault("null") public Optional getPassword(); @Config("secret_key_file") @ConfigDefault("null") public Optional getSecretKeyFilePath(); @Config("secret_key_passphrase") @ConfigDefault("\"\"") public String getSecretKeyPassphrase(); @Config("user_directory_is_root") @ConfigDefault("true") public Boolean getUserDirIsRoot(); @Config("timeout") @ConfigDefault("600") // 10 minutes public int getSftpConnectionTimeout(); @Config("max_connection_retry") @ConfigDefault("5") // 5 times retry to connect sftp server if failed. public int getMaxConnectionRetry(); @Config("path_prefix") public String getPathPrefix(); @Config("file_ext") public String getFileNameExtension(); @Config("sequence_format") @ConfigDefault("\"%03d.%02d.\"") public String getSequenceFormat(); } @Override public ConfigDiff transaction(ConfigSource config, int taskCount, FileOutputPlugin.Control control) { PluginTask task = config.loadConfig(PluginTask.class); // retryable (idempotent) output: // return resume(task.dump(), taskCount, control); // non-retryable (non-idempotent) output: control.run(task.dump()); return Exec.newConfigDiff(); } @Override public ConfigDiff resume(TaskSource taskSource, int taskCount, FileOutputPlugin.Control control) { throw new UnsupportedOperationException("sftp output plugin does not support resuming"); } @Override public void cleanup(TaskSource taskSource, int taskCount, List successTaskReports) { } @Override public TransactionalFileOutput open(TaskSource taskSource, final int taskIndex) { final PluginTask task = taskSource.loadTask(PluginTask.class); return new SftpFileOutput(task, taskIndex); } }