src/main/java/org/embulk/output/sftp/SftpUtils.java in embulk-output-sftp-0.1.11 vs src/main/java/org/embulk/output/sftp/SftpUtils.java in embulk-output-sftp-0.2.0
- old
+ new
@@ -18,10 +18,11 @@
import org.embulk.spi.util.RetryExecutor.RetryGiveupException;
import org.embulk.spi.util.RetryExecutor.Retryable;
import org.slf4j.Logger;
import java.io.BufferedOutputStream;
+import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -37,11 +38,11 @@
* Created by takahiro.nakayama on 10/20/15.
*/
public class SftpUtils
{
private final Logger logger = Exec.getLogger(SftpUtils.class);
- private final DefaultFileSystemManager manager;
+ private DefaultFileSystemManager manager;
private final FileSystemOptions fsOptions;
private final String userInfo;
private final String user;
private final String host;
private final int port;
@@ -157,19 +158,28 @@
public Void call() throws Exception
{
final FileObject remoteFile = newSftpFile(getSftpFileUri(remotePath));
final BufferedOutputStream outputStream = openStream(remoteFile);
// When channel is broken, closing resource may hang, hence the time-out wrapper
- // Note: closing FileObject will also close OutputStream
- try (TimeoutCloser ignored = new TimeoutCloser(outputStream)) {
+ try (final TimeoutCloser ignored = new TimeoutCloser(outputStream)) {
appendFile(localTempFile, remoteFile, outputStream);
return null;
}
finally {
- remoteFile.close();
+ // closing sequentially
+ new TimeoutCloser(remoteFile).close();
}
}
+
+ @Override
+ public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait)
+ {
+ super.onRetry(exception, retryCount, retryLimit, retryWait);
+ // re-connect
+ manager.close();
+ manager = initializeStandardFileSystemManager();
+ }
});
}
/**
* This method won't close outputStream, outputStream is intended to keep open for next write
@@ -256,21 +266,12 @@
FileObject resolve(final String remoteFilePath) throws FileSystemException
{
return manager.resolveFile(getSftpFileUri(remoteFilePath).toString(), fsOptions);
}
- BufferedOutputStream openStream(final FileObject remoteFile)
+ BufferedOutputStream openStream(final FileObject remoteFile) throws FileSystemException
{
- // output stream is already a BufferedOutputStream, no need to wrap
- final String taskName = "SFTP open stream";
- return withRetry(new DefaultRetry<BufferedOutputStream>(taskName)
- {
- @Override
- public BufferedOutputStream call() throws Exception
- {
- return new BufferedOutputStream(remoteFile.getContent().getOutputStream());
- }
- });
+ return new BufferedOutputStream(remoteFile.getContent().getOutputStream());
}
URI getSftpFileUri(String remoteFilePath)
{
try {