src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.2.4 vs src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.2.5
- old
+ new
@@ -1,8 +1,9 @@
package org.embulk.input.gcs;
import com.google.api.client.http.HttpResponseException;
+import com.google.api.client.util.IOUtils;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.model.Bucket;
import com.google.api.services.storage.model.Objects;
import com.google.api.services.storage.model.StorageObject;
import com.google.common.annotations.VisibleForTesting;
@@ -31,10 +32,15 @@
import org.embulk.spi.util.RetryExecutor.RetryGiveupException;
import org.embulk.spi.util.RetryExecutor.Retryable;
import org.slf4j.Logger;
import static org.embulk.spi.util.RetryExecutor.retryExecutor;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.math.BigInteger;
import java.security.GeneralSecurityException;
@@ -248,24 +254,26 @@
ImmutableList.Builder<String> builder = ImmutableList.builder();
String lastKey = lastPath.isPresent() ? base64Encode(lastPath.get()) : null;
// @see https://cloud.google.com/storage/docs/json_api/v1/objects#resource
- try {
- Storage.Buckets.Get getBucket = client.buckets().get(bucket);
- getBucket.setProjection("full");
- Bucket bk = getBucket.execute();
+ if (log.isDebugEnabled()) {
+ try {
+ Storage.Buckets.Get getBucket = client.buckets().get(bucket);
+ getBucket.setProjection("full");
+ Bucket bk = getBucket.execute();
- log.debug("bucket name: " + bucket);
- log.debug("bucket location: " + bk.getLocation());
- log.debug("bucket timeCreated: " + bk.getTimeCreated());
- log.debug("bucket owner: " + bk.getOwner());
+ log.debug("bucket name: " + bucket);
+ log.debug("bucket location: " + bk.getLocation());
+ log.debug("bucket timeCreated: " + bk.getTimeCreated());
+ log.debug("bucket owner: " + bk.getOwner());
+ }
+ catch (IOException e) {
+ log.warn("Could not access to bucket:" + bucket);
+ log.warn(e.getMessage());
+ }
}
- catch (IOException e) {
- log.warn("Could not access to bucket:" + bucket);
- log.warn(e.getMessage());
- }
try {
// @see https://cloud.google.com/storage/docs/json_api/v1/objects/list
Storage.Objects.List listObjects = client.objects().list(bucket);
listObjects.setPrefix(prefix);
@@ -310,17 +318,19 @@
@VisibleForTesting
static class GcsInputStreamReopener
implements ResumableInputStream.Reopener
{
private final Logger log = Exec.getLogger(GcsInputStreamReopener.class);
+ private final File tempFile;
private final Storage client;
private final String bucket;
private final String key;
private final int maxConnectionRetry;
- public GcsInputStreamReopener(Storage client, String bucket, String key, int maxConnectionRetry)
+ public GcsInputStreamReopener(File tempFile, Storage client, String bucket, String key, int maxConnectionRetry)
{
+ this.tempFile = tempFile;
this.client = client;
this.bucket = bucket;
this.key = key;
this.maxConnectionRetry = maxConnectionRetry;
}
@@ -333,15 +343,19 @@
.withRetryLimit(maxConnectionRetry)
.withInitialRetryWait(500)
.withMaxRetryWait(30 * 1000)
.runInterruptible(new Retryable<InputStream>() {
@Override
- public InputStream call() throws InterruptedIOException, IOException
+ public InputStream call() throws IOException
{
log.warn(String.format("GCS read failed. Retrying GET request with %,d bytes offset", offset), closedCause);
Storage.Objects.Get getObject = client.objects().get(bucket, key);
- return getObject.executeMediaAsInputStream();
+
+ try (BufferedOutputStream outputStream = new BufferedOutputStream(new FileOutputStream(tempFile))) {
+ IOUtils.copy(getObject.executeMediaAsInputStream(), outputStream);
+ }
+ return new BufferedInputStream(new FileInputStream(tempFile));
}
@Override
public boolean isRetryableException(Exception exception)
{
@@ -426,11 +440,14 @@
if (opened) {
return null;
}
opened = true;
Storage.Objects.Get getObject = client.objects().get(bucket, key);
-
- return new ResumableInputStream(getObject.executeMediaAsInputStream(), new GcsInputStreamReopener(client, bucket, key, maxConnectionRetry));
+ File tempFile = Exec.getTempFileSpace().createTempFile();
+ try (BufferedOutputStream outputStream = new BufferedOutputStream(new FileOutputStream(tempFile))) {
+ IOUtils.copy(getObject.executeMediaAsInputStream(), outputStream);
+ }
+ return new ResumableInputStream(new BufferedInputStream(new FileInputStream(tempFile)), new GcsInputStreamReopener(tempFile, client, bucket, key, maxConnectionRetry));
}
@Override
public void close()
{