src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.1.9 vs src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.1.10
- old
+ new
@@ -5,14 +5,17 @@
import java.util.Collections;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigInteger;
+import com.google.api.client.http.HttpResponseException;
import com.google.common.collect.ImmutableList;
import com.google.common.base.Optional;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
+import com.google.common.base.Charsets;
+import com.google.common.io.BaseEncoding;
import java.security.GeneralSecurityException;
import org.embulk.config.TaskReport;
import org.embulk.config.Config;
import org.embulk.config.ConfigInject;
@@ -112,10 +115,17 @@
if (!task.getP12Keyfile().isPresent() || !task.getServiceAccountEmail().isPresent()) {
throw new ConfigException("If auth_method is private_key, you have to set both service_account_email and p12_keyfile");
}
}
+ // @see https://cloud.google.com/storage/docs/bucket-naming
+ if (task.getLastPath().isPresent()) {
+ if (task.getLastPath().get().length() >= 128) {
+ throw new ConfigException("last_path length is allowed between 1 and 1024 bytes");
+ }
+ }
+
Storage client = newGcsClient(task, newGcsAuth(task));
// list files recursively
task.setFiles(listFiles(task, client));
// number of processors is same with number of files
@@ -207,11 +217,11 @@
public static List<String> listGcsFilesByPrefix(Storage client, String bucket,
String prefix, Optional<String> lastPath)
{
ImmutableList.Builder<String> builder = ImmutableList.builder();
- String lastKey = lastPath.orNull();
+ String lastKey = lastPath.isPresent() ? base64Encode(lastPath.get()) : null;
// @see https://cloud.google.com/storage/docs/json_api/v1/objects#resource
try {
Storage.Buckets.Get getBucket = client.buckets().get(bucket);
getBucket.setProjection("full");
@@ -248,10 +258,14 @@
}
lastKey = objects.getNextPageToken();
listObjects.setPageToken(lastKey);
} while (lastKey != null);
} catch (IOException e) {
+ if ((e instanceof HttpResponseException) && ((HttpResponseException) e).getStatusCode() == 400) {
+ throw new ConfigException(String.format("Files listing failed: bucket:%s, prefix:%s, last_path:%s", bucket, prefix, lastKey), e);
+ }
+
log.warn(String.format("Could not get file list from bucket:%s", bucket));
log.warn(e.getMessage());
}
return builder.build();
@@ -312,9 +326,27 @@
}
@Override
public void close() { }
}
+
+ // String nextToken = base64Encode(0x0a + 0x01~0x27 + filePath);
+ private static String base64Encode(String path)
+ {
+ byte[] encoding;
+ byte[] utf8 = path.getBytes(Charsets.UTF_8);
+ log.debug(String.format("path string: %s ,path length:%s \" + ", path, utf8.length));
+
+ encoding = new byte[utf8.length + 2];
+ encoding[0] = 0x0a;
+ encoding[1] = new Byte(String.valueOf(path.length()));
+ System.arraycopy(utf8, 0, encoding, 2, utf8.length);
+
+ String s = BaseEncoding.base64().encode(encoding);
+ log.debug(String.format("last_path(base64 encoded): %s" ,s));
+ return s;
+ }
+
public enum AuthMethod
{
private_key("private_key"),
compute_engine("compute_engine"),