src/main/java/org/embulk/output/sftp/SftpUtils.java in embulk-output-sftp-0.1.11 vs src/main/java/org/embulk/output/sftp/SftpUtils.java in embulk-output-sftp-0.2.0

- old
+ new

@@ -18,10 +18,11 @@ import org.embulk.spi.util.RetryExecutor.RetryGiveupException; import org.embulk.spi.util.RetryExecutor.Retryable; import org.slf4j.Logger; import java.io.BufferedOutputStream; +import java.io.Closeable; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -37,11 +38,11 @@ * Created by takahiro.nakayama on 10/20/15. */ public class SftpUtils { private final Logger logger = Exec.getLogger(SftpUtils.class); - private final DefaultFileSystemManager manager; + private DefaultFileSystemManager manager; private final FileSystemOptions fsOptions; private final String userInfo; private final String user; private final String host; private final int port; @@ -157,19 +158,28 @@ public Void call() throws Exception { final FileObject remoteFile = newSftpFile(getSftpFileUri(remotePath)); final BufferedOutputStream outputStream = openStream(remoteFile); // When channel is broken, closing resource may hang, hence the time-out wrapper - // Note: closing FileObject will also close OutputStream - try (TimeoutCloser ignored = new TimeoutCloser(outputStream)) { + try (final TimeoutCloser ignored = new TimeoutCloser(outputStream)) { appendFile(localTempFile, remoteFile, outputStream); return null; } finally { - remoteFile.close(); + // closing sequentially + new TimeoutCloser(remoteFile).close(); } } + + @Override + public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait) + { + super.onRetry(exception, retryCount, retryLimit, retryWait); + // re-connect + manager.close(); + manager = initializeStandardFileSystemManager(); + } }); } /** * This method won't close outputStream, outputStream is intended to keep open for next write @@ -256,21 +266,12 @@ FileObject resolve(final String remoteFilePath) throws FileSystemException { return manager.resolveFile(getSftpFileUri(remoteFilePath).toString(), fsOptions); } - BufferedOutputStream openStream(final FileObject remoteFile) + BufferedOutputStream openStream(final FileObject remoteFile) throws FileSystemException { - // output stream is already a BufferedOutputStream, no need to wrap - final String taskName = "SFTP open stream"; - return withRetry(new DefaultRetry<BufferedOutputStream>(taskName) - { - @Override - public BufferedOutputStream call() throws Exception - { - return new BufferedOutputStream(remoteFile.getContent().getOutputStream()); - } - }); + return new BufferedOutputStream(remoteFile.getContent().getOutputStream()); } URI getSftpFileUri(String remoteFilePath) { try {