src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.1.10 vs src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.1.11
- old
+ new
@@ -2,20 +2,22 @@
import java.util.List;
import java.util.ArrayList;
import java.util.Collections;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.io.InputStream;
import java.math.BigInteger;
import com.google.api.client.http.HttpResponseException;
import com.google.common.collect.ImmutableList;
import com.google.common.base.Optional;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.base.Charsets;
import com.google.common.io.BaseEncoding;
+import com.google.common.annotations.VisibleForTesting;
import java.security.GeneralSecurityException;
import org.embulk.config.TaskReport;
import org.embulk.config.Config;
import org.embulk.config.ConfigInject;
@@ -29,10 +31,14 @@
import org.embulk.spi.BufferAllocator;
import org.embulk.spi.FileInputPlugin;
import org.embulk.spi.TransactionalFileInput;
import org.embulk.spi.unit.LocalFile;
import org.embulk.spi.util.InputStreamFileInput;
+import org.embulk.spi.util.ResumableInputStream;
+import org.embulk.spi.util.RetryExecutor.Retryable;
+import org.embulk.spi.util.RetryExecutor.RetryGiveupException;
+import static org.embulk.spi.util.RetryExecutor.retryExecutor;
import org.slf4j.Logger;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.model.Bucket;
@@ -276,10 +282,77 @@
{
PluginTask task = taskSource.loadTask(PluginTask.class);
return new GcsFileInput(task, taskIndex);
}
+ @VisibleForTesting
+ static class GcsInputStreamReopener
+ implements ResumableInputStream.Reopener
+ {
+ private final Logger log = Exec.getLogger(GcsInputStreamReopener.class);
+ private final Storage client;
+ private final String bucket;
+ private final String key;
+
+ public GcsInputStreamReopener(Storage client, String bucket, String key)
+ {
+ this.client = client;
+ this.bucket = bucket;
+ this.key = key;
+ }
+
+ @Override
+ public InputStream reopen(final long offset, final Exception closedCause) throws IOException
+ {
+ try {
+ return retryExecutor()
+ .withRetryLimit(3)
+ .withInitialRetryWait(500)
+ .withMaxRetryWait(30 * 1000)
+ .runInterruptible(new Retryable<InputStream>() {
+ @Override
+ public InputStream call() throws InterruptedIOException, IOException
+ {
+ log.warn(String.format("GCS read failed. Retrying GET request with %,d bytes offset", offset), closedCause);
+ Storage.Objects.Get getObject = client.objects().get(bucket, key);
+ return getObject.executeMediaAsInputStream();
+ }
+
+ @Override
+ public boolean isRetryableException(Exception exception)
+ {
+ return true; // TODO
+ }
+
+ @Override
+ public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait)
+ throws RetryGiveupException
+ {
+ String message = String.format("GCS GET request failed. Retrying %d/%d after %d seconds. Message: %s",
+ retryCount, retryLimit, retryWait/1000, exception.getMessage());
+ if (retryCount % 3 == 0) {
+ log.warn(message, exception);
+ } else {
+ log.warn(message);
+ }
+ }
+
+ @Override
+ public void onGiveup(Exception firstException, Exception lastException)
+ throws RetryGiveupException
+ {
+ }
+ });
+ } catch (RetryGiveupException ex) {
+ Throwables.propagateIfInstanceOf(ex.getCause(), IOException.class);
+ throw Throwables.propagate(ex.getCause());
+ } catch (InterruptedException ex) {
+ throw new InterruptedIOException();
+ }
+ }
+ }
+
public class GcsFileInput
extends InputStreamFileInput
implements TransactionalFileInput
{
public GcsFileInput(PluginTask task, int taskIndex)
@@ -320,10 +393,10 @@
return null;
}
opened = true;
Storage.Objects.Get getObject = client.objects().get(bucket, key);
- return getObject.executeMediaAsInputStream();
+ return new ResumableInputStream(getObject.executeMediaAsInputStream(), new GcsInputStreamReopener(client, bucket, key));
}
@Override
public void close() { }
}