package org.embulk.output.sftp; import; import; import org.apache.commons.vfs2.FileObject; import org.apache.commons.vfs2.FileSystemException; import org.apache.commons.vfs2.FileSystemOptions; import org.apache.commons.vfs2.impl.StandardFileSystemManager; import org.apache.commons.vfs2.provider.sftp.IdentityInfo; import org.apache.commons.vfs2.provider.sftp.SftpFileSystemConfigBuilder; import org.embulk.config.ConfigException; import org.embulk.config.TaskReport; import org.embulk.spi.Buffer; import org.embulk.spi.Exec; import org.embulk.spi.FileOutput; import org.embulk.spi.TransactionalFileOutput; import org.embulk.spi.unit.LocalFile; import org.slf4j.Logger; import java.lang.Void; import; import; import; import; import; import static org.embulk.output.sftp.SftpFileOutputPlugin.PluginTask; /** * Created by takahiro.nakayama on 10/20/15. */ public class SftpFileOutput implements FileOutput, TransactionalFileOutput { private final Logger logger = Exec.getLogger(SftpFileOutput.class); private final StandardFileSystemManager manager; private final FileSystemOptions fsOptions; private final String userInfo; private final String host; private final int port; private final int maxConnectionRetry; private final String pathPrefix; private final String sequenceFormat; private final String fileNameExtension; private final int taskIndex; private int fileIndex = 0; private FileObject currentFile; private OutputStream currentFileOutputStream; private StandardFileSystemManager initializeStandardFileSystemManager() { if (!logger.isDebugEnabled()) { // TODO: change logging format: org.apache.commons.logging.Log System.setProperty("org.apache.commons.logging.Log", "org.apache.commons.logging.impl.NoOpLog"); } StandardFileSystemManager manager = new StandardFileSystemManager(); try { manager.init(); } catch (FileSystemException e) { logger.error(e.getMessage()); throw new ConfigException(e); } return manager; } private String initializeUserInfo(PluginTask task) { String userInfo = task.getUser(); if (task.getPassword().isPresent()) { userInfo += ":" + task.getPassword().get(); } return userInfo; } private FileSystemOptions initializeFsOptions(PluginTask task) { FileSystemOptions fsOptions = new FileSystemOptions(); try { SftpFileSystemConfigBuilder builder = SftpFileSystemConfigBuilder.getInstance(); builder.setUserDirIsRoot(fsOptions, task.getUserDirIsRoot()); builder.setTimeout(fsOptions, task.getSftpConnectionTimeout()); builder.setStrictHostKeyChecking(fsOptions, "no"); if (task.getSecretKeyFilePath().isPresent()) { IdentityInfo identityInfo = new IdentityInfo( new File((task.getSecretKeyFilePath().transform(localFileToPathString()).get())), task.getSecretKeyPassphrase().getBytes() ); builder.setIdentityInfo(fsOptions, identityInfo);"set identity: {}", task.getSecretKeyFilePath().get()); } if (task.getProxy().isPresent()) { ProxyTask proxy = task.getProxy().get(); ProxyTask.ProxyType.setProxyType(builder, fsOptions, proxy.getType()); if (proxy.getHost().isPresent()) { builder.setProxyHost(fsOptions, proxy.getHost().get()); builder.setProxyPort(fsOptions, proxy.getPort()); } if (proxy.getUser().isPresent()) { builder.setProxyUser(fsOptions, proxy.getUser().get()); } if (proxy.getPassword().isPresent()) { builder.setProxyPassword(fsOptions, proxy.getPassword().get()); } if (proxy.getCommand().isPresent()) { builder.setProxyCommand(fsOptions, proxy.getCommand().get()); } } } catch (FileSystemException e) { logger.error(e.getMessage()); throw new ConfigException(e); } return fsOptions; } SftpFileOutput(PluginTask task, int taskIndex) { this.manager = initializeStandardFileSystemManager(); this.userInfo = initializeUserInfo(task); this.fsOptions = initializeFsOptions(task); = task.getHost(); this.port = task.getPort(); this.maxConnectionRetry = task.getMaxConnectionRetry(); this.pathPrefix = task.getPathPrefix(); this.sequenceFormat = task.getSequenceFormat(); this.fileNameExtension = task.getFileNameExtension(); this.taskIndex = taskIndex; } @Override public void nextFile() { closeCurrentFile(); try { currentFile = newSftpFile(getSftpFileUri(getOutputFilePath())); currentFileOutputStream = newSftpOutputStream(currentFile);"new sftp file: {}", currentFile.getPublicURIString()); } catch (FileSystemException e) { logger.error(e.getMessage()); Throwables.propagate(e); } } @Override public void add(final Buffer buffer) { if (currentFile == null) { throw new IllegalStateException("nextFile() must be called before poll()"); } try { Retriable retriable = new Retriable() { public Void execute() throws IOException { currentFileOutputStream.write(buffer.array(), buffer.offset(), buffer.limit()); return null; } }; try { withConnectionRetry(retriable); } catch (Exception e) { throw (IOException)e; } } catch (IOException e) { logger.error(e.getMessage()); Throwables.propagate(e); } finally { buffer.release(); } } @Override public void finish() { closeCurrentFile(); } @Override public void close() { closeCurrentFile(); manager.close(); } @Override public void abort() { } @Override public TaskReport commit() { return Exec.newTaskReport(); } private void closeCurrentFile() { if (currentFile == null) { return; } try { currentFileOutputStream.close(); } catch (IOException e) {; } try { currentFile.close(); } catch (FileSystemException e) { logger.warn(e.getMessage()); } fileIndex++; currentFile = null; currentFileOutputStream = null; } private URI getSftpFileUri(String remoteFilePath) { try { return new URI("sftp", userInfo, host, port, remoteFilePath, null, null); } catch (URISyntaxException e) { logger.error(e.getMessage()); throw new ConfigException(e); } } private String getOutputFilePath() { return pathPrefix + String.format(sequenceFormat, taskIndex, fileIndex) + fileNameExtension; } interface Retriable { /** * Execute the operation with the given (or null) return value. * * @return any return value from the operation * @throws Exception */ public T execute() throws Exception; } private T withConnectionRetry( final Retriable op ) throws Exception { int count = 0; while (true) { try { return op.execute(); } catch(final Exception e) { if (++count > maxConnectionRetry) { throw e; } logger.warn("failed to connect sftp server: " + e.getMessage(), e); try { long sleepTime = ((long) Math.pow(2, count) * 1000); logger.warn("sleep in next connection retry: {} milliseconds", sleepTime); Thread.sleep(sleepTime); // milliseconds } catch (InterruptedException e1) { // Ignore this exception because this exception is just about `sleep`. logger.warn(e1.getMessage(), e1); } logger.warn("retry to connect sftp server: " + count + " times"); } } } private FileObject newSftpFile(final URI sftpUri) throws FileSystemException { Retriable retriable = new Retriable() { public FileObject execute() throws FileSystemException { FileObject file = manager.resolveFile(sftpUri.toString(), fsOptions); if (file.getParent().exists()) {"parent directory {} exists there", file.getParent()); } else {"trying to create parent directory {}", file.getParent()); file.getParent().createFolder(); } return file; } }; try { return withConnectionRetry(retriable); } catch (Exception e) { throw (FileSystemException)e; } } private OutputStream newSftpOutputStream(final FileObject file) throws FileSystemException { Retriable retriable = new Retriable() { public OutputStream execute() throws FileSystemException { return file.getContent().getOutputStream(); } }; try { return withConnectionRetry(retriable); } catch (Exception e) { throw (FileSystemException)e; } } private Function localFileToPathString() { return new Function() { public String apply(LocalFile file) { return file.getPath().toString(); } }; } }