src/main/java/org/embulk/input/gcs/GcsFileInput.java in embulk-input-gcs-0.3.0 vs src/main/java/org/embulk/input/gcs/GcsFileInput.java in embulk-input-gcs-0.3.1
- old
+ new
@@ -1,36 +1,28 @@
package org.embulk.input.gcs;
-import com.google.api.client.http.HttpResponseException;
-import com.google.api.services.storage.Storage;
-import com.google.api.services.storage.model.Bucket;
-import com.google.api.services.storage.model.Objects;
-import com.google.api.services.storage.model.StorageObject;
+import com.google.api.gax.paging.Page;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageException;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.io.BaseEncoding;
import org.embulk.config.ConfigException;
import org.embulk.config.TaskReport;
import org.embulk.spi.Exec;
import org.embulk.spi.TransactionalFileInput;
-import org.embulk.spi.unit.LocalFile;
import org.embulk.spi.util.InputStreamFileInput;
import org.slf4j.Logger;
-import java.io.IOException;
-import java.math.BigInteger;
-import java.security.GeneralSecurityException;
-import java.util.List;
-import java.util.Optional;
-import java.util.function.Function;
-
public class GcsFileInput
extends InputStreamFileInput
implements TransactionalFileInput
{
- private static final Logger log = Exec.getLogger(org.embulk.input.gcs.GcsFileInput.class);
+ private static final Logger LOG = Exec.getLogger(org.embulk.input.gcs.GcsFileInput.class);
- public GcsFileInput(PluginTask task, int taskIndex)
+ GcsFileInput(PluginTask task, int taskIndex)
{
super(task.getBufferAllocator(), new SingleFileProvider(task, taskIndex));
}
public void abort()
@@ -45,151 +37,79 @@
@Override
public void close()
{
}
- public static GcsAuthentication newGcsAuth(PluginTask task)
- {
- try {
- return new GcsAuthentication(
- task.getAuthMethod().getString(),
- task.getServiceAccountEmail(),
- task.getP12Keyfile().map(localFileToPathString()),
- task.getJsonKeyfile().map(localFileToPathString()),
- task.getApplicationName()
- );
- }
- catch (GeneralSecurityException | IOException ex) {
- throw new ConfigException(ex);
- }
- }
-
- protected static Storage newGcsClient(final PluginTask task, final GcsAuthentication auth)
- {
- Storage client = null;
- try {
- client = auth.getGcsClient(task.getBucket(), task.getMaxConnectionRetry());
- }
- catch (IOException ex) {
- throw new ConfigException(ex);
- }
-
- return client;
- }
-
- private static Function<LocalFile, String> localFileToPathString()
- {
- return new Function<LocalFile, String>()
- {
- public String apply(LocalFile file)
- {
- return file.getPath().toString();
- }
- };
- }
-
- public static FileList listFiles(PluginTask task, Storage client)
- {
- String bucket = task.getBucket();
-
- FileList.Builder builder = new FileList.Builder(task);
- listGcsFilesByPrefix(builder, client, bucket, task.getPathPrefix().get(), task.getLastPath());
- return builder.build();
- }
-
/**
* Lists GCS filenames filtered by prefix.
*
* The resulting list does not include the file that's size == 0.
*/
- public static void listGcsFilesByPrefix(FileList.Builder builder, Storage client, String bucket,
- String prefix, Optional<String> lastPath)
+ static FileList listFiles(PluginTask task)
{
- String lastKey = lastPath.isPresent() ? base64Encode(lastPath.get()) : null;
+ Storage client = AuthUtils.newClient(task);
+ String bucket = task.getBucket();
- // @see https://cloud.google.com/storage/docs/json_api/v1/objects#resource
- if (log.isDebugEnabled()) {
- try {
- Storage.Buckets.Get getBucket = client.buckets().get(bucket);
- getBucket.setProjection("full");
- Bucket bk = getBucket.execute();
-
- log.debug("bucket name: " + bucket);
- log.debug("bucket location: " + bk.getLocation());
- log.debug("bucket timeCreated: " + bk.getTimeCreated());
- log.debug("bucket owner: " + bk.getOwner());
- }
- catch (IOException e) {
- log.warn("Could not access to bucket:" + bucket);
- log.warn(e.getMessage());
- }
+ // @see https://cloud.google.com/storage/docs/json_api/v1/buckets/get
+ if (LOG.isDebugEnabled()) {
+ printBucketInfo(client, bucket);
}
+ String prefix = task.getPathPrefix().orElse("");
+ String lastKey = task.getLastPath().isPresent() ? base64Encode(task.getLastPath().get()) : "";
+ FileList.Builder builder = new FileList.Builder(task);
+
try {
// @see https://cloud.google.com/storage/docs/json_api/v1/objects/list
- Storage.Objects.List listObjects = client.objects().list(bucket);
- listObjects.setPrefix(prefix);
- listObjects.setPageToken(lastKey);
- do {
- Objects objects = listObjects.execute();
- List<StorageObject> items = objects.getItems();
- if (items == null) {
- log.info(String.format("No file was found in bucket:%s prefix:%s", bucket, prefix));
- break;
+ Page<Blob> blobs = client.list(bucket, Storage.BlobListOption.prefix(prefix), Storage.BlobListOption.pageToken(lastKey));
+ for (Blob blob : blobs.iterateAll()) {
+ if (blob.getSize() > 0) {
+ builder.add(blob.getName(), blob.getSize());
}
- for (StorageObject o : items) {
- if (o.getSize().compareTo(BigInteger.ZERO) > 0) {
- builder.add(o.getName(), o.getSize().longValue());
- }
- log.debug("filename: " + o.getName());
- log.debug("updated: " + o.getUpdated());
- }
- lastKey = objects.getNextPageToken();
- listObjects.setPageToken(lastKey);
- } while (lastKey != null);
+ LOG.debug("filename: {}", blob.getName());
+ LOG.debug("updated: {}", blob.getUpdateTime());
+ }
}
- catch (IOException e) {
- if ((e instanceof HttpResponseException) && ((HttpResponseException) e).getStatusCode() == 400) {
+ catch (RuntimeException e) {
+ if ((e instanceof StorageException) && ((StorageException) e).getCode() == 400) {
throw new ConfigException(String.format("Files listing failed: bucket:%s, prefix:%s, last_path:%s", bucket, prefix, lastKey), e);
}
- log.warn(String.format("Could not get file list from bucket:%s", bucket));
- log.warn(e.getMessage());
+ LOG.warn(String.format("Could not get file list from bucket:%s", bucket));
+ LOG.warn(e.getMessage());
}
+ return builder.build();
}
// String nextToken = base64Encode(0x0a + 0x01~0x27 + filePath);
- private static String base64Encode(String path)
+ @VisibleForTesting
+ static String base64Encode(String path)
{
byte[] encoding;
byte[] utf8 = path.getBytes(Charsets.UTF_8);
- log.debug(String.format("path string: %s ,path length:%s \" + ", path, utf8.length));
+ LOG.debug("path string: {} ,path length:{} \" + ", path, utf8.length);
encoding = new byte[utf8.length + 2];
encoding[0] = 0x0a;
- encoding[1] = new Byte(String.valueOf(path.length()));
+ encoding[1] = Byte.valueOf(String.valueOf(path.length()));
System.arraycopy(utf8, 0, encoding, 2, utf8.length);
String s = BaseEncoding.base64().encode(encoding);
- log.debug(String.format("last_path(base64 encoded): %s", s));
+ LOG.debug("last_path(base64 encoded): {}", s);
return s;
}
- public enum AuthMethod
+ private static void printBucketInfo(Storage client, String bucket)
{
- private_key("private_key"),
- compute_engine("compute_engine"),
- json_key("json_key");
-
- private final String string;
-
- AuthMethod(String string)
- {
- this.string = string;
- }
-
- public String getString()
- {
- return string;
- }
+ // get Bucket
+ Storage.BucketGetOption fields = Storage.BucketGetOption.fields(
+ Storage.BucketField.LOCATION,
+ Storage.BucketField.TIME_CREATED,
+ Storage.BucketField.OWNER
+ );
+ com.google.cloud.storage.Bucket bk = client.get(bucket, fields);
+ LOG.debug("bucket name: {}", bk.getName());
+ LOG.debug("bucket location: {}", bk.getLocation());
+ LOG.debug("bucket timeCreated: {}", bk.getCreateTime());
+ LOG.debug("bucket owner: {}", bk.getOwner());
}
}