src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.2.4 vs src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.2.5

- old
+ new

@@ -1,8 +1,9 @@ package org.embulk.input.gcs; import com.google.api.client.http.HttpResponseException; +import com.google.api.client.util.IOUtils; import com.google.api.services.storage.Storage; import com.google.api.services.storage.model.Bucket; import com.google.api.services.storage.model.Objects; import com.google.api.services.storage.model.StorageObject; import com.google.common.annotations.VisibleForTesting; @@ -31,10 +32,15 @@ import org.embulk.spi.util.RetryExecutor.RetryGiveupException; import org.embulk.spi.util.RetryExecutor.Retryable; import org.slf4j.Logger; import static org.embulk.spi.util.RetryExecutor.retryExecutor; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InterruptedIOException; import java.math.BigInteger; import java.security.GeneralSecurityException; @@ -248,24 +254,26 @@ ImmutableList.Builder<String> builder = ImmutableList.builder(); String lastKey = lastPath.isPresent() ? base64Encode(lastPath.get()) : null; // @see https://cloud.google.com/storage/docs/json_api/v1/objects#resource - try { - Storage.Buckets.Get getBucket = client.buckets().get(bucket); - getBucket.setProjection("full"); - Bucket bk = getBucket.execute(); + if (log.isDebugEnabled()) { + try { + Storage.Buckets.Get getBucket = client.buckets().get(bucket); + getBucket.setProjection("full"); + Bucket bk = getBucket.execute(); - log.debug("bucket name: " + bucket); - log.debug("bucket location: " + bk.getLocation()); - log.debug("bucket timeCreated: " + bk.getTimeCreated()); - log.debug("bucket owner: " + bk.getOwner()); + log.debug("bucket name: " + bucket); + log.debug("bucket location: " + bk.getLocation()); + log.debug("bucket timeCreated: " + bk.getTimeCreated()); + log.debug("bucket owner: " + bk.getOwner()); + } + catch (IOException e) { + log.warn("Could not access to bucket:" + bucket); + log.warn(e.getMessage()); + } } - catch (IOException e) { - log.warn("Could not access to bucket:" + bucket); - log.warn(e.getMessage()); - } try { // @see https://cloud.google.com/storage/docs/json_api/v1/objects/list Storage.Objects.List listObjects = client.objects().list(bucket); listObjects.setPrefix(prefix); @@ -310,17 +318,19 @@ @VisibleForTesting static class GcsInputStreamReopener implements ResumableInputStream.Reopener { private final Logger log = Exec.getLogger(GcsInputStreamReopener.class); + private final File tempFile; private final Storage client; private final String bucket; private final String key; private final int maxConnectionRetry; - public GcsInputStreamReopener(Storage client, String bucket, String key, int maxConnectionRetry) + public GcsInputStreamReopener(File tempFile, Storage client, String bucket, String key, int maxConnectionRetry) { + this.tempFile = tempFile; this.client = client; this.bucket = bucket; this.key = key; this.maxConnectionRetry = maxConnectionRetry; } @@ -333,15 +343,19 @@ .withRetryLimit(maxConnectionRetry) .withInitialRetryWait(500) .withMaxRetryWait(30 * 1000) .runInterruptible(new Retryable<InputStream>() { @Override - public InputStream call() throws InterruptedIOException, IOException + public InputStream call() throws IOException { log.warn(String.format("GCS read failed. Retrying GET request with %,d bytes offset", offset), closedCause); Storage.Objects.Get getObject = client.objects().get(bucket, key); - return getObject.executeMediaAsInputStream(); + + try (BufferedOutputStream outputStream = new BufferedOutputStream(new FileOutputStream(tempFile))) { + IOUtils.copy(getObject.executeMediaAsInputStream(), outputStream); + } + return new BufferedInputStream(new FileInputStream(tempFile)); } @Override public boolean isRetryableException(Exception exception) { @@ -426,11 +440,14 @@ if (opened) { return null; } opened = true; Storage.Objects.Get getObject = client.objects().get(bucket, key); - - return new ResumableInputStream(getObject.executeMediaAsInputStream(), new GcsInputStreamReopener(client, bucket, key, maxConnectionRetry)); + File tempFile = Exec.getTempFileSpace().createTempFile(); + try (BufferedOutputStream outputStream = new BufferedOutputStream(new FileOutputStream(tempFile))) { + IOUtils.copy(getObject.executeMediaAsInputStream(), outputStream); + } + return new ResumableInputStream(new BufferedInputStream(new FileInputStream(tempFile)), new GcsInputStreamReopener(tempFile, client, bucket, key, maxConnectionRetry)); } @Override public void close() {