src/main/java/org/embulk/output/sftp/SftpUtils.java in embulk-output-sftp-0.1.10 vs src/main/java/org/embulk/output/sftp/SftpUtils.java in embulk-output-sftp-0.1.11
- old
+ new
@@ -1,16 +1,20 @@
package org.embulk.output.sftp;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import org.apache.commons.vfs2.FileObject;
import org.apache.commons.vfs2.FileSystemException;
import org.apache.commons.vfs2.FileSystemOptions;
import org.apache.commons.vfs2.impl.DefaultFileSystemManager;
import org.apache.commons.vfs2.provider.sftp.IdentityInfo;
import org.apache.commons.vfs2.provider.sftp.SftpFileSystemConfigBuilder;
import org.embulk.config.ConfigException;
+import org.embulk.output.sftp.utils.DefaultRetry;
+import org.embulk.output.sftp.utils.TimedCallable;
+import org.embulk.output.sftp.utils.TimeoutCloser;
import org.embulk.spi.Exec;
import org.embulk.spi.unit.LocalFile;
import org.embulk.spi.util.RetryExecutor.RetryGiveupException;
import org.embulk.spi.util.RetryExecutor.Retryable;
import org.slf4j.Logger;
@@ -18,12 +22,14 @@
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import static org.embulk.output.sftp.SftpFileOutputPlugin.PluginTask;
import static org.embulk.spi.util.RetryExecutor.retryExecutor;
@@ -38,10 +44,12 @@
private final String userInfo;
private final String user;
private final String host;
private final int port;
private final int maxConnectionRetry;
+ @VisibleForTesting
+ int writeTimeout = 300; // 5 minutes
private DefaultFileSystemManager initializeStandardFileSystemManager()
{
if (!logger.isDebugEnabled()) {
// TODO: change logging format: org.apache.commons.logging.Log
@@ -51,11 +59,11 @@
* We can use StandardFileSystemManager instead of DefaultFileSystemManager
* when Apache Commons VFS2 removes permission check logic when remote file renaming.
* https://github.com/embulk/embulk-output-sftp/issues/40
* https://github.com/embulk/embulk-output-sftp/pull/44
* https://issues.apache.org/jira/browse/VFS-590
- */
+ */
DefaultFileSystemManager manager = new DefaultFileSystemManager();
try {
manager.addProvider("sftp", new org.embulk.output.sftp.provider.sftp.SftpFileProvider());
manager.init();
}
@@ -85,12 +93,12 @@
builder.setUserDirIsRoot(fsOptions, task.getUserDirIsRoot());
builder.setTimeout(fsOptions, task.getSftpConnectionTimeout() * 1000);
builder.setStrictHostKeyChecking(fsOptions, "no");
if (task.getSecretKeyFilePath().isPresent()) {
IdentityInfo identityInfo = new IdentityInfo(
- new File((task.getSecretKeyFilePath().transform(localFileToPathString()).get())),
- task.getSecretKeyPassphrase().getBytes()
+ new File((task.getSecretKeyFilePath().transform(localFileToPathString()).get())),
+ task.getSecretKeyPassphrase().getBytes()
);
builder.setIdentityInfo(fsOptions, identityInfo);
logger.info("set identity: {}", task.getSecretKeyFilePath().get());
}
@@ -139,137 +147,97 @@
public void close()
{
manager.close();
}
- public Void uploadFile(final File localTempFile, final String remotePath)
+ void uploadFile(final File localTempFile, final String remotePath)
{
- try {
- return retryExecutor()
- .withRetryLimit(maxConnectionRetry)
- .withInitialRetryWait(500)
- .withMaxRetryWait(30 * 1000)
- .runInterruptible(new Retryable<Void>() {
- @Override
- public Void call() throws IOException
- {
- long size = localTempFile.length();
- int step = 10; // 10% each step
- long bytesPerStep = size / step;
- long startTime = System.nanoTime();
+ withRetry(new DefaultRetry<Void>(String.format("SFTP upload file '%s'", remotePath))
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ final FileObject remoteFile = newSftpFile(getSftpFileUri(remotePath));
+ final BufferedOutputStream outputStream = openStream(remoteFile);
+ // When channel is broken, closing resource may hang, hence the time-out wrapper
+ // Note: closing FileObject will also close OutputStream
+ try (TimeoutCloser ignored = new TimeoutCloser(outputStream)) {
+ appendFile(localTempFile, remoteFile, outputStream);
+ return null;
+ }
+ finally {
+ remoteFile.close();
+ }
+ }
+ });
+ }
- try (FileObject remoteFile = newSftpFile(getSftpFileUri(remotePath));
- InputStream inputStream = new FileInputStream(localTempFile);
- BufferedOutputStream outputStream = new BufferedOutputStream(remoteFile.getContent().getOutputStream());
- ) {
- logger.info("Uploading to remote sftp file ({} KB): {}", size / 1024, remoteFile.getPublicURIString());
- byte[] buffer = new byte[32 * 1024 * 1024]; // 32MB buffer size
- int len = inputStream.read(buffer);
- long total = 0;
- int progress = 0;
- while (len != -1) {
- outputStream.write(buffer, 0, len);
- len = inputStream.read(buffer);
- total += len;
- if (total / bytesPerStep > progress) {
- progress = (int) (total / bytesPerStep);
- long transferRate = (long) (total / ((System.nanoTime() - startTime) / 1e9));
- logger.info("Upload progress: {}% - {} KB - {} KB/s",
- progress * step, total / 1024, transferRate / 1024);
- }
- }
- logger.info("Upload completed.");
- }
- return null;
- }
+ /**
+ * This method won't close outputStream, outputStream is intended to keep open for next write
+ *
+ * @param localTempFile
+ * @param remoteFile
+ * @param outputStream
+ * @throws IOException
+ */
+ void appendFile(final File localTempFile, final FileObject remoteFile, final BufferedOutputStream outputStream) throws IOException
+ {
+ long size = localTempFile.length();
+ int step = 10; // 10% each step
+ long bytesPerStep = Math.max(size / step, 1); // to prevent / 0 if file size < 10 bytes
+ long startTime = System.nanoTime();
- @Override
- public boolean isRetryableException(Exception exception)
- {
- if (exception instanceof ConfigException) {
- return false;
- }
- return true;
- }
-
- @Override
- public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait) throws RetryGiveupException
- {
- String message = String.format("SFTP output failed. Retrying %d/%d after %d seconds. Message: %s",
- retryCount, retryLimit, retryWait / 1000, exception.getMessage());
- if (retryCount % 3 == 0) {
- logger.warn(message, exception);
- }
- else {
- logger.warn(message);
- }
- }
-
- @Override
- public void onGiveup(Exception firstException, Exception lastException) throws RetryGiveupException
- {
- }
- });
+ // start uploading
+ try (InputStream inputStream = new FileInputStream(localTempFile)) {
+ logger.info("Uploading to remote sftp file ({} KB): {}", size / 1024, remoteFile.getPublicURIString());
+ final byte[] buffer = new byte[32 * 1024 * 1024]; // 32MB buffer size
+ int len = inputStream.read(buffer);
+ long total = 0;
+ int progress = 0;
+ while (len != -1) {
+ timedWrite(outputStream, buffer, len);
+ len = inputStream.read(buffer);
+ total += len;
+ if (total / bytesPerStep > progress) {
+ progress = (int) (total / bytesPerStep);
+ long transferRate = (long) (total / ((System.nanoTime() - startTime) / 1e9));
+ logger.info("Upload progress: {}% - {} KB - {} KB/s",
+ progress * step, total / 1024, transferRate / 1024);
+ }
+ }
+ logger.info("Upload completed.");
}
- catch (RetryGiveupException ex) {
- throw Throwables.propagate(ex.getCause());
- }
- catch (InterruptedException ex) {
- throw Throwables.propagate(ex);
- }
}
public Void renameFile(final String before, final String after)
{
- try {
- return retryExecutor()
- .withRetryLimit(maxConnectionRetry)
- .withInitialRetryWait(500)
- .withMaxRetryWait(30 * 1000)
- .runInterruptible(new Retryable<Void>() {
- @Override
- public Void call() throws IOException
- {
- FileObject previousFile = manager.resolveFile(getSftpFileUri(before).toString(), fsOptions);
- FileObject afterFile = manager.resolveFile(getSftpFileUri(after).toString(), fsOptions);
- previousFile.moveTo(afterFile);
- logger.info("renamed remote file: {} to {}", previousFile.getPublicURIString(), afterFile.getPublicURIString());
+ return withRetry(new DefaultRetry<Void>("SFTP rename remote file")
+ {
+ @Override
+ public Void call() throws IOException
+ {
+ FileObject previousFile = resolve(before);
+ FileObject afterFile = resolve(after);
+ previousFile.moveTo(afterFile);
+ logger.info("renamed remote file: {} to {}", previousFile.getPublicURIString(), afterFile.getPublicURIString());
- return null;
- }
+ return null;
+ }
+ });
+ }
- @Override
- public boolean isRetryableException(Exception exception)
- {
- return true;
- }
-
- @Override
- public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait) throws RetryGiveupException
- {
- String message = String.format("SFTP rename remote file failed. Retrying %d/%d after %d seconds. Message: %s",
- retryCount, retryLimit, retryWait / 1000, exception.getMessage());
- if (retryCount % 3 == 0) {
- logger.warn(message, exception);
- }
- else {
- logger.warn(message);
- }
- }
-
- @Override
- public void onGiveup(Exception firstException, Exception lastException) throws RetryGiveupException
- {
- }
- });
+ public void deleteFile(final String remotePath)
+ {
+ try {
+ FileObject file = manager.resolveFile(getSftpFileUri(remotePath).toString(), fsOptions);
+ if (file.exists()) {
+ file.delete();
+ }
}
- catch (RetryGiveupException ex) {
- throw Throwables.propagate(ex.getCause());
+ catch (FileSystemException e) {
+ logger.warn("Failed to delete remote file '{}': {}", remotePath, e.getMessage());
}
- catch (InterruptedException ex) {
- throw Throwables.propagate(ex);
- }
}
public void validateHost(PluginTask task)
{
Pattern pattern = Pattern.compile("\\s");
@@ -283,22 +251,41 @@
throw new ConfigException("'proxy.host' can't contains spaces");
}
}
}
- private URI getSftpFileUri(String remoteFilePath)
+ FileObject resolve(final String remoteFilePath) throws FileSystemException
{
+ return manager.resolveFile(getSftpFileUri(remoteFilePath).toString(), fsOptions);
+ }
+
+ BufferedOutputStream openStream(final FileObject remoteFile)
+ {
+ // output stream is already a BufferedOutputStream, no need to wrap
+ final String taskName = "SFTP open stream";
+ return withRetry(new DefaultRetry<BufferedOutputStream>(taskName)
+ {
+ @Override
+ public BufferedOutputStream call() throws Exception
+ {
+ return new BufferedOutputStream(remoteFile.getContent().getOutputStream());
+ }
+ });
+ }
+
+ URI getSftpFileUri(String remoteFilePath)
+ {
try {
return new URI("sftp", userInfo, host, port, remoteFilePath, null, null);
}
catch (URISyntaxException e) {
String message = String.format("URISyntaxException was thrown: Illegal character in sftp://%s:******@%s:%s%s", user, host, port, remoteFilePath);
throw new ConfigException(message);
}
}
- private FileObject newSftpFile(final URI sftpUri) throws FileSystemException
+ FileObject newSftpFile(final URI sftpUri) throws FileSystemException
{
FileObject file = manager.resolveFile(sftpUri.toString(), fsOptions);
if (file.exists()) {
file.delete();
}
@@ -319,7 +306,43 @@
public String apply(LocalFile file)
{
return file.getPath().toString();
}
};
+ }
+
+ private <T> T withRetry(Retryable<T> call)
+ {
+ try {
+ return retryExecutor()
+ .withRetryLimit(maxConnectionRetry)
+ .withInitialRetryWait(500)
+ .withMaxRetryWait(30 * 1000)
+ .runInterruptible(call);
+ }
+ catch (RetryGiveupException ex) {
+ throw Throwables.propagate(ex.getCause());
+ }
+ catch (InterruptedException ex) {
+ throw Throwables.propagate(ex);
+ }
+ }
+
+ private void timedWrite(final OutputStream stream, final byte[] buf, final int len) throws IOException
+ {
+ try {
+ new TimedCallable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ stream.write(buf, 0, len);
+ return null;
+ }
+ }.call(writeTimeout, TimeUnit.SECONDS);
+ }
+ catch (Exception e) {
+ logger.warn("Failed to write buffer, aborting ... ");
+ throw new IOException(e);
+ }
}
}