src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.1.6 vs src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.1.7
- old
+ new
@@ -4,12 +4,15 @@
import java.util.ArrayList;
import java.util.Collections;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigInteger;
+
import com.google.common.collect.ImmutableList;
import com.google.common.base.Optional;
+import com.google.common.base.Function;
+import com.google.common.base.Throwables;
import java.security.GeneralSecurityException;
import org.embulk.config.TaskReport;
import org.embulk.config.Config;
import org.embulk.config.ConfigInject;
@@ -21,10 +24,11 @@
import org.embulk.config.TaskSource;
import org.embulk.spi.Exec;
import org.embulk.spi.BufferAllocator;
import org.embulk.spi.FileInputPlugin;
import org.embulk.spi.TransactionalFileInput;
+import org.embulk.spi.unit.LocalFile;
import org.embulk.spi.util.InputStreamFileInput;
import org.slf4j.Logger;
import com.google.api.services.storage.Storage;
@@ -58,29 +62,73 @@
@Config("application_name")
@ConfigDefault("\"Embulk GCS input plugin\"")
String getApplicationName();
+ // kept for backward compatibility
@Config("p12_keyfile_fullpath")
@ConfigDefault("null")
Optional<String> getP12KeyfileFullpath();
+ @Config("p12_keyfile")
+ @ConfigDefault("null")
+ Optional<LocalFile> getP12Keyfile();
+ void setP12Keyfile(Optional<LocalFile> p12Keyfile);
+
+ @Config("json_keyfile")
+ @ConfigDefault("null")
+ Optional<LocalFile> getJsonKeyfile();
+
List<String> getFiles();
void setFiles(List<String> files);
@ConfigInject
BufferAllocator getBufferAllocator();
}
private static final Logger log = Exec.getLogger(GcsFileInputPlugin.class);
+ private static GcsAuthentication auth;
@Override
public ConfigDiff transaction(ConfigSource config,
FileInputPlugin.Control control)
{
PluginTask task = config.loadConfig(PluginTask.class);
+ if (task.getP12KeyfileFullpath().isPresent()) {
+ if (task.getP12Keyfile().isPresent()) {
+ throw new ConfigException("Setting both p12_keyfile_fullpath and p12_keyfile is invalid");
+ }
+ try {
+ task.setP12Keyfile(Optional.of(LocalFile.of(task.getP12KeyfileFullpath().get())));
+ } catch (IOException ex) {
+ throw Throwables.propagate(ex);
+ }
+ }
+
+ if (task.getAuthMethod().getString().equals("json_key")) {
+ if (!task.getJsonKeyfile().isPresent()) {
+ throw new ConfigException("If auth_method is json_key, you have to set json_keyfile");
+ }
+ } else if (task.getAuthMethod().getString().equals("private_key")) {
+ if (!task.getP12Keyfile().isPresent() || !task.getServiceAccountEmail().isPresent()) {
+ throw new ConfigException("If auth_method is private_key, you have to set both service_account_email and p12_keyfile");
+ }
+ }
+
+ try {
+ auth = new GcsAuthentication(
+ task.getAuthMethod().getString(),
+ task.getServiceAccountEmail(),
+ task.getP12Keyfile().transform(localFileToPathString()),
+ task.getJsonKeyfile().transform(localFileToPathString()),
+ task.getApplicationName()
+ );
+ } catch (GeneralSecurityException | IOException ex) {
+ throw new ConfigException(ex);
+ }
+
// list files recursively
task.setFiles(listFiles(task));
// number of processors is same with number of files
return resume(task.dump(), task.getFiles().size(), control);
}
@@ -115,22 +163,33 @@
int taskCount,
List<TaskReport> successTaskReports)
{
}
- private static Storage newGcsClient(final PluginTask task) {
+ private static Storage newGcsClient(final PluginTask task)
+ {
Storage client = null;
try {
- GcsAuthentication auth = new GcsAuthentication(task.getAuthMethod().getString(), task.getServiceAccountEmail(), task.getP12KeyfileFullpath(), task.getApplicationName());
client = auth.getGcsClient(task.getBucket());
- } catch (GeneralSecurityException | IOException ex) {
+ } catch (IOException ex) {
throw new ConfigException(ex);
}
return client;
}
+ private Function<LocalFile, String> localFileToPathString()
+ {
+ return new Function<LocalFile, String>()
+ {
+ public String apply(LocalFile file)
+ {
+ return file.getPath().toString();
+ }
+ };
+ }
+
public List<String> listFiles(PluginTask task)
{
Storage client = newGcsClient(task);
String bucket = task.getBucket();
@@ -257,10 +316,11 @@
}
public enum AuthMethod
{
private_key("private_key"),
- compute_engine("compute_engine");
+ compute_engine("compute_engine"),
+ json_key("json_key");
private final String string;
AuthMethod(String string)
{