src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.2.5 vs src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.2.6

- old
+ new

@@ -1,115 +1,29 @@ package org.embulk.input.gcs; -import com.google.api.client.http.HttpResponseException; -import com.google.api.client.util.IOUtils; import com.google.api.services.storage.Storage; -import com.google.api.services.storage.model.Bucket; -import com.google.api.services.storage.model.Objects; -import com.google.api.services.storage.model.StorageObject; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Charsets; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.io.BaseEncoding; -import org.embulk.config.Config; -import org.embulk.config.ConfigDefault; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigException; -import org.embulk.config.ConfigInject; import org.embulk.config.ConfigSource; -import org.embulk.config.Task; import org.embulk.config.TaskReport; import org.embulk.config.TaskSource; -import org.embulk.spi.BufferAllocator; import org.embulk.spi.Exec; import org.embulk.spi.FileInputPlugin; import org.embulk.spi.TransactionalFileInput; import org.embulk.spi.unit.LocalFile; -import org.embulk.spi.util.InputStreamFileInput; -import org.embulk.spi.util.ResumableInputStream; -import org.embulk.spi.util.RetryExecutor.RetryGiveupException; -import org.embulk.spi.util.RetryExecutor.Retryable; import org.slf4j.Logger; -import static org.embulk.spi.util.RetryExecutor.retryExecutor; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.InterruptedIOException; -import java.math.BigInteger; import java.security.GeneralSecurityException; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; public class GcsFileInputPlugin implements FileInputPlugin { - public interface PluginTask - extends Task - { - @Config("bucket") - String getBucket(); - - @Config("path_prefix") - @ConfigDefault("null") - Optional<String> getPathPrefix(); - - @Config("last_path") - @ConfigDefault("null") - Optional<String> getLastPath(); - - @Config("incremental") - @ConfigDefault("true") - boolean getIncremental(); - - @Config("auth_method") - @ConfigDefault("\"private_key\"") - AuthMethod getAuthMethod(); - - @Config("service_account_email") - @ConfigDefault("null") - Optional<String> getServiceAccountEmail(); - - @Config("application_name") - @ConfigDefault("\"Embulk GCS input plugin\"") - String getApplicationName(); - - // kept for backward compatibility - @Config("p12_keyfile_fullpath") - @ConfigDefault("null") - Optional<String> getP12KeyfileFullpath(); - - @Config("p12_keyfile") - @ConfigDefault("null") - Optional<LocalFile> getP12Keyfile(); - void setP12Keyfile(Optional<LocalFile> p12Keyfile); - - @Config("json_keyfile") - @ConfigDefault("null") - Optional<LocalFile> getJsonKeyfile(); - - @Config("paths") - @ConfigDefault("[]") - List<String> getFiles(); - void setFiles(List<String> files); - - @Config("max_connection_retry") - @ConfigDefault("10") // 10 times retry to connect GCS server if failed. - int getMaxConnectionRetry(); - - @ConfigInject - BufferAllocator getBufferAllocator(); - } - private static final Logger log = Exec.getLogger(GcsFileInputPlugin.class); @Override public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control) @@ -144,23 +58,28 @@ if (task.getLastPath().get().length() >= 128) { throw new ConfigException("last_path length is allowed between 1 and 1024 bytes"); } } - Storage client = newGcsClient(task, newGcsAuth(task)); + Storage client = GcsFileInput.newGcsClient(task, newGcsAuth(task)); // list files recursively if path_prefix is specified if (task.getPathPrefix().isPresent()) { - task.setFiles(listFiles(task, client)); + task.setFiles(GcsFileInput.listFiles(task, client)); } else { - if (task.getFiles().isEmpty()) { + if (task.getPathFiles().isEmpty()) { throw new ConfigException("No file is found. Confirm paths option isn't empty"); } + FileList.Builder builder = new FileList.Builder(config); + for (String file: task.getPathFiles()) { + builder.add(file, 1); + } + task.setFiles(builder.build()); } // number of processors is same with number of files - return resume(task.dump(), task.getFiles().size(), control); + return resume(task.dump(), task.getFiles().getTaskCount(), control); } private GcsAuthentication newGcsAuth(PluginTask task) { try { @@ -186,22 +105,12 @@ control.run(taskSource, taskCount); ConfigDiff configDiff = Exec.newConfigDiff(); - List<String> files = new ArrayList<String>(task.getFiles()); if (task.getIncremental()) { - if (files.isEmpty()) { - // keep the last value if any - if (task.getLastPath().isPresent()) { - configDiff.set("last_path", task.getLastPath().get()); - } - } - else { - Collections.sort(files); - configDiff.set("last_path", files.get(files.size() - 1)); - } + configDiff.set("last_path", task.getFiles().getLastPath(task.getLastPath())); } return configDiff; } @@ -210,23 +119,10 @@ int taskCount, List<TaskReport> successTaskReports) { } - protected Storage newGcsClient(final PluginTask task, final GcsAuthentication auth) - { - Storage client = null; - try { - client = auth.getGcsClient(task.getBucket(), task.getMaxConnectionRetry()); - } - catch (IOException ex) { - throw new ConfigException(ex); - } - - return client; - } - private Function<LocalFile, String> localFileToPathString() { return new Function<LocalFile, String>() { public String apply(LocalFile file) @@ -234,260 +130,12 @@ return file.getPath().toString(); } }; } - public List<String> listFiles(PluginTask task, Storage client) - { - String bucket = task.getBucket(); - - return listGcsFilesByPrefix(client, bucket, task.getPathPrefix().get(), task.getLastPath()); - } - - /** - * Lists GCS filenames filtered by prefix. - * - * The resulting list does not include the file that's size == 0. - */ - public static List<String> listGcsFilesByPrefix(Storage client, String bucket, - String prefix, Optional<String> lastPath) - { - ImmutableList.Builder<String> builder = ImmutableList.builder(); - - String lastKey = lastPath.isPresent() ? base64Encode(lastPath.get()) : null; - - // @see https://cloud.google.com/storage/docs/json_api/v1/objects#resource - if (log.isDebugEnabled()) { - try { - Storage.Buckets.Get getBucket = client.buckets().get(bucket); - getBucket.setProjection("full"); - Bucket bk = getBucket.execute(); - - log.debug("bucket name: " + bucket); - log.debug("bucket location: " + bk.getLocation()); - log.debug("bucket timeCreated: " + bk.getTimeCreated()); - log.debug("bucket owner: " + bk.getOwner()); - } - catch (IOException e) { - log.warn("Could not access to bucket:" + bucket); - log.warn(e.getMessage()); - } - } - - try { - // @see https://cloud.google.com/storage/docs/json_api/v1/objects/list - Storage.Objects.List listObjects = client.objects().list(bucket); - listObjects.setPrefix(prefix); - listObjects.setPageToken(lastKey); - do { - Objects objects = listObjects.execute(); - List<StorageObject> items = objects.getItems(); - if (items == null) { - log.info(String.format("No file was found in bucket:%s prefix:%s", bucket, prefix)); - break; - } - for (StorageObject o : items) { - if (o.getSize().compareTo(BigInteger.ZERO) > 0) { - builder.add(o.getName()); - } - log.debug("filename: " + o.getName()); - log.debug("updated: " + o.getUpdated()); - } - lastKey = objects.getNextPageToken(); - listObjects.setPageToken(lastKey); - } while (lastKey != null); - } - catch (IOException e) { - if ((e instanceof HttpResponseException) && ((HttpResponseException) e).getStatusCode() == 400) { - throw new ConfigException(String.format("Files listing failed: bucket:%s, prefix:%s, last_path:%s", bucket, prefix, lastKey), e); - } - - log.warn(String.format("Could not get file list from bucket:%s", bucket)); - log.warn(e.getMessage()); - } - - return builder.build(); - } - @Override public TransactionalFileInput open(TaskSource taskSource, int taskIndex) { PluginTask task = taskSource.loadTask(PluginTask.class); return new GcsFileInput(task, taskIndex); - } - - @VisibleForTesting - static class GcsInputStreamReopener - implements ResumableInputStream.Reopener - { - private final Logger log = Exec.getLogger(GcsInputStreamReopener.class); - private final File tempFile; - private final Storage client; - private final String bucket; - private final String key; - private final int maxConnectionRetry; - - public GcsInputStreamReopener(File tempFile, Storage client, String bucket, String key, int maxConnectionRetry) - { - this.tempFile = tempFile; - this.client = client; - this.bucket = bucket; - this.key = key; - this.maxConnectionRetry = maxConnectionRetry; - } - - @Override - public InputStream reopen(final long offset, final Exception closedCause) throws IOException - { - try { - return retryExecutor() - .withRetryLimit(maxConnectionRetry) - .withInitialRetryWait(500) - .withMaxRetryWait(30 * 1000) - .runInterruptible(new Retryable<InputStream>() { - @Override - public InputStream call() throws IOException - { - log.warn(String.format("GCS read failed. Retrying GET request with %,d bytes offset", offset), closedCause); - Storage.Objects.Get getObject = client.objects().get(bucket, key); - - try (BufferedOutputStream outputStream = new BufferedOutputStream(new FileOutputStream(tempFile))) { - IOUtils.copy(getObject.executeMediaAsInputStream(), outputStream); - } - return new BufferedInputStream(new FileInputStream(tempFile)); - } - - @Override - public boolean isRetryableException(Exception exception) - { - return true; // TODO - } - - @Override - public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait) - throws RetryGiveupException - { - String message = String.format("GCS GET request failed. Retrying %d/%d after %d seconds. Message: %s", - retryCount, retryLimit, retryWait / 1000, exception.getMessage()); - if (retryCount % 3 == 0) { - log.warn(message, exception); - } - else { - log.warn(message); - } - } - - @Override - public void onGiveup(Exception firstException, Exception lastException) - throws RetryGiveupException - { - } - }); - } - catch (RetryGiveupException ex) { - Throwables.propagateIfInstanceOf(ex.getCause(), IOException.class); - throw Throwables.propagate(ex.getCause()); - } - catch (InterruptedException ex) { - throw new InterruptedIOException(); - } - } - } - - public class GcsFileInput - extends InputStreamFileInput - implements TransactionalFileInput - { - public GcsFileInput(PluginTask task, int taskIndex) - { - super(task.getBufferAllocator(), new SingleFileProvider(task, taskIndex)); - } - - public void abort() - { - } - - public TaskReport commit() - { - return Exec.newTaskReport(); - } - - @Override - public void close() - { - } - } - - private class SingleFileProvider - implements InputStreamFileInput.Provider - { - private final Storage client; - private final String bucket; - private final String key; - private final int maxConnectionRetry; - private boolean opened = false; - - public SingleFileProvider(PluginTask task, int taskIndex) - { - this.client = newGcsClient(task, newGcsAuth(task)); - this.bucket = task.getBucket(); - this.key = task.getFiles().get(taskIndex); - this.maxConnectionRetry = task.getMaxConnectionRetry(); - } - - @Override - public InputStream openNext() throws IOException - { - if (opened) { - return null; - } - opened = true; - Storage.Objects.Get getObject = client.objects().get(bucket, key); - File tempFile = Exec.getTempFileSpace().createTempFile(); - try (BufferedOutputStream outputStream = new BufferedOutputStream(new FileOutputStream(tempFile))) { - IOUtils.copy(getObject.executeMediaAsInputStream(), outputStream); - } - return new ResumableInputStream(new BufferedInputStream(new FileInputStream(tempFile)), new GcsInputStreamReopener(tempFile, client, bucket, key, maxConnectionRetry)); - } - - @Override - public void close() - { - } - } - - // String nextToken = base64Encode(0x0a + 0x01~0x27 + filePath); - private static String base64Encode(String path) - { - byte[] encoding; - byte[] utf8 = path.getBytes(Charsets.UTF_8); - log.debug(String.format("path string: %s ,path length:%s \" + ", path, utf8.length)); - - encoding = new byte[utf8.length + 2]; - encoding[0] = 0x0a; - encoding[1] = new Byte(String.valueOf(path.length())); - System.arraycopy(utf8, 0, encoding, 2, utf8.length); - - String s = BaseEncoding.base64().encode(encoding); - log.debug(String.format("last_path(base64 encoded): %s", s)); - return s; - } - - public enum AuthMethod - { - private_key("private_key"), - compute_engine("compute_engine"), - json_key("json_key"); - - private final String string; - - AuthMethod(String string) - { - this.string = string; - } - - public String getString() - { - return string; - } } }