src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.1.9 vs src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.1.10

- old
+ new

@@ -5,14 +5,17 @@ import java.util.Collections; import java.io.IOException; import java.io.InputStream; import java.math.BigInteger; +import com.google.api.client.http.HttpResponseException; import com.google.common.collect.ImmutableList; import com.google.common.base.Optional; import com.google.common.base.Function; import com.google.common.base.Throwables; +import com.google.common.base.Charsets; +import com.google.common.io.BaseEncoding; import java.security.GeneralSecurityException; import org.embulk.config.TaskReport; import org.embulk.config.Config; import org.embulk.config.ConfigInject; @@ -112,10 +115,17 @@ if (!task.getP12Keyfile().isPresent() || !task.getServiceAccountEmail().isPresent()) { throw new ConfigException("If auth_method is private_key, you have to set both service_account_email and p12_keyfile"); } } + // @see https://cloud.google.com/storage/docs/bucket-naming + if (task.getLastPath().isPresent()) { + if (task.getLastPath().get().length() >= 128) { + throw new ConfigException("last_path length is allowed between 1 and 1024 bytes"); + } + } + Storage client = newGcsClient(task, newGcsAuth(task)); // list files recursively task.setFiles(listFiles(task, client)); // number of processors is same with number of files @@ -207,11 +217,11 @@ public static List<String> listGcsFilesByPrefix(Storage client, String bucket, String prefix, Optional<String> lastPath) { ImmutableList.Builder<String> builder = ImmutableList.builder(); - String lastKey = lastPath.orNull(); + String lastKey = lastPath.isPresent() ? base64Encode(lastPath.get()) : null; // @see https://cloud.google.com/storage/docs/json_api/v1/objects#resource try { Storage.Buckets.Get getBucket = client.buckets().get(bucket); getBucket.setProjection("full"); @@ -248,10 +258,14 @@ } lastKey = objects.getNextPageToken(); listObjects.setPageToken(lastKey); } while (lastKey != null); } catch (IOException e) { + if ((e instanceof HttpResponseException) && ((HttpResponseException) e).getStatusCode() == 400) { + throw new ConfigException(String.format("Files listing failed: bucket:%s, prefix:%s, last_path:%s", bucket, prefix, lastKey), e); + } + log.warn(String.format("Could not get file list from bucket:%s", bucket)); log.warn(e.getMessage()); } return builder.build(); @@ -312,9 +326,27 @@ } @Override public void close() { } } + + // String nextToken = base64Encode(0x0a + 0x01~0x27 + filePath); + private static String base64Encode(String path) + { + byte[] encoding; + byte[] utf8 = path.getBytes(Charsets.UTF_8); + log.debug(String.format("path string: %s ,path length:%s \" + ", path, utf8.length)); + + encoding = new byte[utf8.length + 2]; + encoding[0] = 0x0a; + encoding[1] = new Byte(String.valueOf(path.length())); + System.arraycopy(utf8, 0, encoding, 2, utf8.length); + + String s = BaseEncoding.base64().encode(encoding); + log.debug(String.format("last_path(base64 encoded): %s" ,s)); + return s; + } + public enum AuthMethod { private_key("private_key"), compute_engine("compute_engine"),