src/main/java/org/embulk/input/gcs/GcsFileInput.java in embulk-input-gcs-0.3.0 vs src/main/java/org/embulk/input/gcs/GcsFileInput.java in embulk-input-gcs-0.3.1

- old
+ new

@@ -1,36 +1,28 @@ package org.embulk.input.gcs; -import com.google.api.client.http.HttpResponseException; -import com.google.api.services.storage.Storage; -import com.google.api.services.storage.model.Bucket; -import com.google.api.services.storage.model.Objects; -import com.google.api.services.storage.model.StorageObject; +import com.google.api.gax.paging.Page; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageException; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; import com.google.common.io.BaseEncoding; import org.embulk.config.ConfigException; import org.embulk.config.TaskReport; import org.embulk.spi.Exec; import org.embulk.spi.TransactionalFileInput; -import org.embulk.spi.unit.LocalFile; import org.embulk.spi.util.InputStreamFileInput; import org.slf4j.Logger; -import java.io.IOException; -import java.math.BigInteger; -import java.security.GeneralSecurityException; -import java.util.List; -import java.util.Optional; -import java.util.function.Function; - public class GcsFileInput extends InputStreamFileInput implements TransactionalFileInput { - private static final Logger log = Exec.getLogger(org.embulk.input.gcs.GcsFileInput.class); + private static final Logger LOG = Exec.getLogger(org.embulk.input.gcs.GcsFileInput.class); - public GcsFileInput(PluginTask task, int taskIndex) + GcsFileInput(PluginTask task, int taskIndex) { super(task.getBufferAllocator(), new SingleFileProvider(task, taskIndex)); } public void abort() @@ -45,151 +37,79 @@ @Override public void close() { } - public static GcsAuthentication newGcsAuth(PluginTask task) - { - try { - return new GcsAuthentication( - task.getAuthMethod().getString(), - task.getServiceAccountEmail(), - task.getP12Keyfile().map(localFileToPathString()), - task.getJsonKeyfile().map(localFileToPathString()), - task.getApplicationName() - ); - } - catch (GeneralSecurityException | IOException ex) { - throw new ConfigException(ex); - } - } - - protected static Storage newGcsClient(final PluginTask task, final GcsAuthentication auth) - { - Storage client = null; - try { - client = auth.getGcsClient(task.getBucket(), task.getMaxConnectionRetry()); - } - catch (IOException ex) { - throw new ConfigException(ex); - } - - return client; - } - - private static Function<LocalFile, String> localFileToPathString() - { - return new Function<LocalFile, String>() - { - public String apply(LocalFile file) - { - return file.getPath().toString(); - } - }; - } - - public static FileList listFiles(PluginTask task, Storage client) - { - String bucket = task.getBucket(); - - FileList.Builder builder = new FileList.Builder(task); - listGcsFilesByPrefix(builder, client, bucket, task.getPathPrefix().get(), task.getLastPath()); - return builder.build(); - } - /** * Lists GCS filenames filtered by prefix. * * The resulting list does not include the file that's size == 0. */ - public static void listGcsFilesByPrefix(FileList.Builder builder, Storage client, String bucket, - String prefix, Optional<String> lastPath) + static FileList listFiles(PluginTask task) { - String lastKey = lastPath.isPresent() ? base64Encode(lastPath.get()) : null; + Storage client = AuthUtils.newClient(task); + String bucket = task.getBucket(); - // @see https://cloud.google.com/storage/docs/json_api/v1/objects#resource - if (log.isDebugEnabled()) { - try { - Storage.Buckets.Get getBucket = client.buckets().get(bucket); - getBucket.setProjection("full"); - Bucket bk = getBucket.execute(); - - log.debug("bucket name: " + bucket); - log.debug("bucket location: " + bk.getLocation()); - log.debug("bucket timeCreated: " + bk.getTimeCreated()); - log.debug("bucket owner: " + bk.getOwner()); - } - catch (IOException e) { - log.warn("Could not access to bucket:" + bucket); - log.warn(e.getMessage()); - } + // @see https://cloud.google.com/storage/docs/json_api/v1/buckets/get + if (LOG.isDebugEnabled()) { + printBucketInfo(client, bucket); } + String prefix = task.getPathPrefix().orElse(""); + String lastKey = task.getLastPath().isPresent() ? base64Encode(task.getLastPath().get()) : ""; + FileList.Builder builder = new FileList.Builder(task); + try { // @see https://cloud.google.com/storage/docs/json_api/v1/objects/list - Storage.Objects.List listObjects = client.objects().list(bucket); - listObjects.setPrefix(prefix); - listObjects.setPageToken(lastKey); - do { - Objects objects = listObjects.execute(); - List<StorageObject> items = objects.getItems(); - if (items == null) { - log.info(String.format("No file was found in bucket:%s prefix:%s", bucket, prefix)); - break; + Page<Blob> blobs = client.list(bucket, Storage.BlobListOption.prefix(prefix), Storage.BlobListOption.pageToken(lastKey)); + for (Blob blob : blobs.iterateAll()) { + if (blob.getSize() > 0) { + builder.add(blob.getName(), blob.getSize()); } - for (StorageObject o : items) { - if (o.getSize().compareTo(BigInteger.ZERO) > 0) { - builder.add(o.getName(), o.getSize().longValue()); - } - log.debug("filename: " + o.getName()); - log.debug("updated: " + o.getUpdated()); - } - lastKey = objects.getNextPageToken(); - listObjects.setPageToken(lastKey); - } while (lastKey != null); + LOG.debug("filename: {}", blob.getName()); + LOG.debug("updated: {}", blob.getUpdateTime()); + } } - catch (IOException e) { - if ((e instanceof HttpResponseException) && ((HttpResponseException) e).getStatusCode() == 400) { + catch (RuntimeException e) { + if ((e instanceof StorageException) && ((StorageException) e).getCode() == 400) { throw new ConfigException(String.format("Files listing failed: bucket:%s, prefix:%s, last_path:%s", bucket, prefix, lastKey), e); } - log.warn(String.format("Could not get file list from bucket:%s", bucket)); - log.warn(e.getMessage()); + LOG.warn(String.format("Could not get file list from bucket:%s", bucket)); + LOG.warn(e.getMessage()); } + return builder.build(); } // String nextToken = base64Encode(0x0a + 0x01~0x27 + filePath); - private static String base64Encode(String path) + @VisibleForTesting + static String base64Encode(String path) { byte[] encoding; byte[] utf8 = path.getBytes(Charsets.UTF_8); - log.debug(String.format("path string: %s ,path length:%s \" + ", path, utf8.length)); + LOG.debug("path string: {} ,path length:{} \" + ", path, utf8.length); encoding = new byte[utf8.length + 2]; encoding[0] = 0x0a; - encoding[1] = new Byte(String.valueOf(path.length())); + encoding[1] = Byte.valueOf(String.valueOf(path.length())); System.arraycopy(utf8, 0, encoding, 2, utf8.length); String s = BaseEncoding.base64().encode(encoding); - log.debug(String.format("last_path(base64 encoded): %s", s)); + LOG.debug("last_path(base64 encoded): {}", s); return s; } - public enum AuthMethod + private static void printBucketInfo(Storage client, String bucket) { - private_key("private_key"), - compute_engine("compute_engine"), - json_key("json_key"); - - private final String string; - - AuthMethod(String string) - { - this.string = string; - } - - public String getString() - { - return string; - } + // get Bucket + Storage.BucketGetOption fields = Storage.BucketGetOption.fields( + Storage.BucketField.LOCATION, + Storage.BucketField.TIME_CREATED, + Storage.BucketField.OWNER + ); + com.google.cloud.storage.Bucket bk = client.get(bucket, fields); + LOG.debug("bucket name: {}", bk.getName()); + LOG.debug("bucket location: {}", bk.getLocation()); + LOG.debug("bucket timeCreated: {}", bk.getCreateTime()); + LOG.debug("bucket owner: {}", bk.getOwner()); } }