src/main/java/org/embulk/output/sftp/SftpUtils.java in embulk-output-sftp-0.1.10 vs src/main/java/org/embulk/output/sftp/SftpUtils.java in embulk-output-sftp-0.1.11

- old
+ new

@@ -1,16 +1,20 @@ package org.embulk.output.sftp; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Throwables; import org.apache.commons.vfs2.FileObject; import org.apache.commons.vfs2.FileSystemException; import org.apache.commons.vfs2.FileSystemOptions; import org.apache.commons.vfs2.impl.DefaultFileSystemManager; import org.apache.commons.vfs2.provider.sftp.IdentityInfo; import org.apache.commons.vfs2.provider.sftp.SftpFileSystemConfigBuilder; import org.embulk.config.ConfigException; +import org.embulk.output.sftp.utils.DefaultRetry; +import org.embulk.output.sftp.utils.TimedCallable; +import org.embulk.output.sftp.utils.TimeoutCloser; import org.embulk.spi.Exec; import org.embulk.spi.unit.LocalFile; import org.embulk.spi.util.RetryExecutor.RetryGiveupException; import org.embulk.spi.util.RetryExecutor.Retryable; import org.slf4j.Logger; @@ -18,12 +22,14 @@ import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.net.URI; import java.net.URISyntaxException; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import static org.embulk.output.sftp.SftpFileOutputPlugin.PluginTask; import static org.embulk.spi.util.RetryExecutor.retryExecutor; @@ -38,10 +44,12 @@ private final String userInfo; private final String user; private final String host; private final int port; private final int maxConnectionRetry; + @VisibleForTesting + int writeTimeout = 300; // 5 minutes private DefaultFileSystemManager initializeStandardFileSystemManager() { if (!logger.isDebugEnabled()) { // TODO: change logging format: org.apache.commons.logging.Log @@ -51,11 +59,11 @@ * We can use StandardFileSystemManager instead of DefaultFileSystemManager * when Apache Commons VFS2 removes permission check logic when remote file renaming. * https://github.com/embulk/embulk-output-sftp/issues/40 * https://github.com/embulk/embulk-output-sftp/pull/44 * https://issues.apache.org/jira/browse/VFS-590 - */ + */ DefaultFileSystemManager manager = new DefaultFileSystemManager(); try { manager.addProvider("sftp", new org.embulk.output.sftp.provider.sftp.SftpFileProvider()); manager.init(); } @@ -85,12 +93,12 @@ builder.setUserDirIsRoot(fsOptions, task.getUserDirIsRoot()); builder.setTimeout(fsOptions, task.getSftpConnectionTimeout() * 1000); builder.setStrictHostKeyChecking(fsOptions, "no"); if (task.getSecretKeyFilePath().isPresent()) { IdentityInfo identityInfo = new IdentityInfo( - new File((task.getSecretKeyFilePath().transform(localFileToPathString()).get())), - task.getSecretKeyPassphrase().getBytes() + new File((task.getSecretKeyFilePath().transform(localFileToPathString()).get())), + task.getSecretKeyPassphrase().getBytes() ); builder.setIdentityInfo(fsOptions, identityInfo); logger.info("set identity: {}", task.getSecretKeyFilePath().get()); } @@ -139,137 +147,97 @@ public void close() { manager.close(); } - public Void uploadFile(final File localTempFile, final String remotePath) + void uploadFile(final File localTempFile, final String remotePath) { - try { - return retryExecutor() - .withRetryLimit(maxConnectionRetry) - .withInitialRetryWait(500) - .withMaxRetryWait(30 * 1000) - .runInterruptible(new Retryable<Void>() { - @Override - public Void call() throws IOException - { - long size = localTempFile.length(); - int step = 10; // 10% each step - long bytesPerStep = size / step; - long startTime = System.nanoTime(); + withRetry(new DefaultRetry<Void>(String.format("SFTP upload file '%s'", remotePath)) + { + @Override + public Void call() throws Exception + { + final FileObject remoteFile = newSftpFile(getSftpFileUri(remotePath)); + final BufferedOutputStream outputStream = openStream(remoteFile); + // When channel is broken, closing resource may hang, hence the time-out wrapper + // Note: closing FileObject will also close OutputStream + try (TimeoutCloser ignored = new TimeoutCloser(outputStream)) { + appendFile(localTempFile, remoteFile, outputStream); + return null; + } + finally { + remoteFile.close(); + } + } + }); + } - try (FileObject remoteFile = newSftpFile(getSftpFileUri(remotePath)); - InputStream inputStream = new FileInputStream(localTempFile); - BufferedOutputStream outputStream = new BufferedOutputStream(remoteFile.getContent().getOutputStream()); - ) { - logger.info("Uploading to remote sftp file ({} KB): {}", size / 1024, remoteFile.getPublicURIString()); - byte[] buffer = new byte[32 * 1024 * 1024]; // 32MB buffer size - int len = inputStream.read(buffer); - long total = 0; - int progress = 0; - while (len != -1) { - outputStream.write(buffer, 0, len); - len = inputStream.read(buffer); - total += len; - if (total / bytesPerStep > progress) { - progress = (int) (total / bytesPerStep); - long transferRate = (long) (total / ((System.nanoTime() - startTime) / 1e9)); - logger.info("Upload progress: {}% - {} KB - {} KB/s", - progress * step, total / 1024, transferRate / 1024); - } - } - logger.info("Upload completed."); - } - return null; - } + /** + * This method won't close outputStream, outputStream is intended to keep open for next write + * + * @param localTempFile + * @param remoteFile + * @param outputStream + * @throws IOException + */ + void appendFile(final File localTempFile, final FileObject remoteFile, final BufferedOutputStream outputStream) throws IOException + { + long size = localTempFile.length(); + int step = 10; // 10% each step + long bytesPerStep = Math.max(size / step, 1); // to prevent / 0 if file size < 10 bytes + long startTime = System.nanoTime(); - @Override - public boolean isRetryableException(Exception exception) - { - if (exception instanceof ConfigException) { - return false; - } - return true; - } - - @Override - public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait) throws RetryGiveupException - { - String message = String.format("SFTP output failed. Retrying %d/%d after %d seconds. Message: %s", - retryCount, retryLimit, retryWait / 1000, exception.getMessage()); - if (retryCount % 3 == 0) { - logger.warn(message, exception); - } - else { - logger.warn(message); - } - } - - @Override - public void onGiveup(Exception firstException, Exception lastException) throws RetryGiveupException - { - } - }); + // start uploading + try (InputStream inputStream = new FileInputStream(localTempFile)) { + logger.info("Uploading to remote sftp file ({} KB): {}", size / 1024, remoteFile.getPublicURIString()); + final byte[] buffer = new byte[32 * 1024 * 1024]; // 32MB buffer size + int len = inputStream.read(buffer); + long total = 0; + int progress = 0; + while (len != -1) { + timedWrite(outputStream, buffer, len); + len = inputStream.read(buffer); + total += len; + if (total / bytesPerStep > progress) { + progress = (int) (total / bytesPerStep); + long transferRate = (long) (total / ((System.nanoTime() - startTime) / 1e9)); + logger.info("Upload progress: {}% - {} KB - {} KB/s", + progress * step, total / 1024, transferRate / 1024); + } + } + logger.info("Upload completed."); } - catch (RetryGiveupException ex) { - throw Throwables.propagate(ex.getCause()); - } - catch (InterruptedException ex) { - throw Throwables.propagate(ex); - } } public Void renameFile(final String before, final String after) { - try { - return retryExecutor() - .withRetryLimit(maxConnectionRetry) - .withInitialRetryWait(500) - .withMaxRetryWait(30 * 1000) - .runInterruptible(new Retryable<Void>() { - @Override - public Void call() throws IOException - { - FileObject previousFile = manager.resolveFile(getSftpFileUri(before).toString(), fsOptions); - FileObject afterFile = manager.resolveFile(getSftpFileUri(after).toString(), fsOptions); - previousFile.moveTo(afterFile); - logger.info("renamed remote file: {} to {}", previousFile.getPublicURIString(), afterFile.getPublicURIString()); + return withRetry(new DefaultRetry<Void>("SFTP rename remote file") + { + @Override + public Void call() throws IOException + { + FileObject previousFile = resolve(before); + FileObject afterFile = resolve(after); + previousFile.moveTo(afterFile); + logger.info("renamed remote file: {} to {}", previousFile.getPublicURIString(), afterFile.getPublicURIString()); - return null; - } + return null; + } + }); + } - @Override - public boolean isRetryableException(Exception exception) - { - return true; - } - - @Override - public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait) throws RetryGiveupException - { - String message = String.format("SFTP rename remote file failed. Retrying %d/%d after %d seconds. Message: %s", - retryCount, retryLimit, retryWait / 1000, exception.getMessage()); - if (retryCount % 3 == 0) { - logger.warn(message, exception); - } - else { - logger.warn(message); - } - } - - @Override - public void onGiveup(Exception firstException, Exception lastException) throws RetryGiveupException - { - } - }); + public void deleteFile(final String remotePath) + { + try { + FileObject file = manager.resolveFile(getSftpFileUri(remotePath).toString(), fsOptions); + if (file.exists()) { + file.delete(); + } } - catch (RetryGiveupException ex) { - throw Throwables.propagate(ex.getCause()); + catch (FileSystemException e) { + logger.warn("Failed to delete remote file '{}': {}", remotePath, e.getMessage()); } - catch (InterruptedException ex) { - throw Throwables.propagate(ex); - } } public void validateHost(PluginTask task) { Pattern pattern = Pattern.compile("\\s"); @@ -283,22 +251,41 @@ throw new ConfigException("'proxy.host' can't contains spaces"); } } } - private URI getSftpFileUri(String remoteFilePath) + FileObject resolve(final String remoteFilePath) throws FileSystemException { + return manager.resolveFile(getSftpFileUri(remoteFilePath).toString(), fsOptions); + } + + BufferedOutputStream openStream(final FileObject remoteFile) + { + // output stream is already a BufferedOutputStream, no need to wrap + final String taskName = "SFTP open stream"; + return withRetry(new DefaultRetry<BufferedOutputStream>(taskName) + { + @Override + public BufferedOutputStream call() throws Exception + { + return new BufferedOutputStream(remoteFile.getContent().getOutputStream()); + } + }); + } + + URI getSftpFileUri(String remoteFilePath) + { try { return new URI("sftp", userInfo, host, port, remoteFilePath, null, null); } catch (URISyntaxException e) { String message = String.format("URISyntaxException was thrown: Illegal character in sftp://%s:******@%s:%s%s", user, host, port, remoteFilePath); throw new ConfigException(message); } } - private FileObject newSftpFile(final URI sftpUri) throws FileSystemException + FileObject newSftpFile(final URI sftpUri) throws FileSystemException { FileObject file = manager.resolveFile(sftpUri.toString(), fsOptions); if (file.exists()) { file.delete(); } @@ -319,7 +306,43 @@ public String apply(LocalFile file) { return file.getPath().toString(); } }; + } + + private <T> T withRetry(Retryable<T> call) + { + try { + return retryExecutor() + .withRetryLimit(maxConnectionRetry) + .withInitialRetryWait(500) + .withMaxRetryWait(30 * 1000) + .runInterruptible(call); + } + catch (RetryGiveupException ex) { + throw Throwables.propagate(ex.getCause()); + } + catch (InterruptedException ex) { + throw Throwables.propagate(ex); + } + } + + private void timedWrite(final OutputStream stream, final byte[] buf, final int len) throws IOException + { + try { + new TimedCallable<Void>() + { + @Override + public Void call() throws Exception + { + stream.write(buf, 0, len); + return null; + } + }.call(writeTimeout, TimeUnit.SECONDS); + } + catch (Exception e) { + logger.warn("Failed to write buffer, aborting ... "); + throw new IOException(e); + } } }