src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.1.10 vs src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.1.11

- old
+ new

@@ -2,20 +2,22 @@ import java.util.List; import java.util.ArrayList; import java.util.Collections; import java.io.IOException; +import java.io.InterruptedIOException; import java.io.InputStream; import java.math.BigInteger; import com.google.api.client.http.HttpResponseException; import com.google.common.collect.ImmutableList; import com.google.common.base.Optional; import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.base.Charsets; import com.google.common.io.BaseEncoding; +import com.google.common.annotations.VisibleForTesting; import java.security.GeneralSecurityException; import org.embulk.config.TaskReport; import org.embulk.config.Config; import org.embulk.config.ConfigInject; @@ -29,10 +31,14 @@ import org.embulk.spi.BufferAllocator; import org.embulk.spi.FileInputPlugin; import org.embulk.spi.TransactionalFileInput; import org.embulk.spi.unit.LocalFile; import org.embulk.spi.util.InputStreamFileInput; +import org.embulk.spi.util.ResumableInputStream; +import org.embulk.spi.util.RetryExecutor.Retryable; +import org.embulk.spi.util.RetryExecutor.RetryGiveupException; +import static org.embulk.spi.util.RetryExecutor.retryExecutor; import org.slf4j.Logger; import com.google.api.services.storage.Storage; import com.google.api.services.storage.model.Bucket; @@ -276,10 +282,77 @@ { PluginTask task = taskSource.loadTask(PluginTask.class); return new GcsFileInput(task, taskIndex); } + @VisibleForTesting + static class GcsInputStreamReopener + implements ResumableInputStream.Reopener + { + private final Logger log = Exec.getLogger(GcsInputStreamReopener.class); + private final Storage client; + private final String bucket; + private final String key; + + public GcsInputStreamReopener(Storage client, String bucket, String key) + { + this.client = client; + this.bucket = bucket; + this.key = key; + } + + @Override + public InputStream reopen(final long offset, final Exception closedCause) throws IOException + { + try { + return retryExecutor() + .withRetryLimit(3) + .withInitialRetryWait(500) + .withMaxRetryWait(30 * 1000) + .runInterruptible(new Retryable<InputStream>() { + @Override + public InputStream call() throws InterruptedIOException, IOException + { + log.warn(String.format("GCS read failed. Retrying GET request with %,d bytes offset", offset), closedCause); + Storage.Objects.Get getObject = client.objects().get(bucket, key); + return getObject.executeMediaAsInputStream(); + } + + @Override + public boolean isRetryableException(Exception exception) + { + return true; // TODO + } + + @Override + public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait) + throws RetryGiveupException + { + String message = String.format("GCS GET request failed. Retrying %d/%d after %d seconds. Message: %s", + retryCount, retryLimit, retryWait/1000, exception.getMessage()); + if (retryCount % 3 == 0) { + log.warn(message, exception); + } else { + log.warn(message); + } + } + + @Override + public void onGiveup(Exception firstException, Exception lastException) + throws RetryGiveupException + { + } + }); + } catch (RetryGiveupException ex) { + Throwables.propagateIfInstanceOf(ex.getCause(), IOException.class); + throw Throwables.propagate(ex.getCause()); + } catch (InterruptedException ex) { + throw new InterruptedIOException(); + } + } + } + public class GcsFileInput extends InputStreamFileInput implements TransactionalFileInput { public GcsFileInput(PluginTask task, int taskIndex) @@ -320,10 +393,10 @@ return null; } opened = true; Storage.Objects.Get getObject = client.objects().get(bucket, key); - return getObject.executeMediaAsInputStream(); + return new ResumableInputStream(getObject.executeMediaAsInputStream(), new GcsInputStreamReopener(client, bucket, key)); } @Override public void close() { } }