src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.1.6 vs src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.1.7

- old
+ new

@@ -4,12 +4,15 @@ import java.util.ArrayList; import java.util.Collections; import java.io.IOException; import java.io.InputStream; import java.math.BigInteger; + import com.google.common.collect.ImmutableList; import com.google.common.base.Optional; +import com.google.common.base.Function; +import com.google.common.base.Throwables; import java.security.GeneralSecurityException; import org.embulk.config.TaskReport; import org.embulk.config.Config; import org.embulk.config.ConfigInject; @@ -21,10 +24,11 @@ import org.embulk.config.TaskSource; import org.embulk.spi.Exec; import org.embulk.spi.BufferAllocator; import org.embulk.spi.FileInputPlugin; import org.embulk.spi.TransactionalFileInput; +import org.embulk.spi.unit.LocalFile; import org.embulk.spi.util.InputStreamFileInput; import org.slf4j.Logger; import com.google.api.services.storage.Storage; @@ -58,29 +62,73 @@ @Config("application_name") @ConfigDefault("\"Embulk GCS input plugin\"") String getApplicationName(); + // kept for backward compatibility @Config("p12_keyfile_fullpath") @ConfigDefault("null") Optional<String> getP12KeyfileFullpath(); + @Config("p12_keyfile") + @ConfigDefault("null") + Optional<LocalFile> getP12Keyfile(); + void setP12Keyfile(Optional<LocalFile> p12Keyfile); + + @Config("json_keyfile") + @ConfigDefault("null") + Optional<LocalFile> getJsonKeyfile(); + List<String> getFiles(); void setFiles(List<String> files); @ConfigInject BufferAllocator getBufferAllocator(); } private static final Logger log = Exec.getLogger(GcsFileInputPlugin.class); + private static GcsAuthentication auth; @Override public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control) { PluginTask task = config.loadConfig(PluginTask.class); + if (task.getP12KeyfileFullpath().isPresent()) { + if (task.getP12Keyfile().isPresent()) { + throw new ConfigException("Setting both p12_keyfile_fullpath and p12_keyfile is invalid"); + } + try { + task.setP12Keyfile(Optional.of(LocalFile.of(task.getP12KeyfileFullpath().get()))); + } catch (IOException ex) { + throw Throwables.propagate(ex); + } + } + + if (task.getAuthMethod().getString().equals("json_key")) { + if (!task.getJsonKeyfile().isPresent()) { + throw new ConfigException("If auth_method is json_key, you have to set json_keyfile"); + } + } else if (task.getAuthMethod().getString().equals("private_key")) { + if (!task.getP12Keyfile().isPresent() || !task.getServiceAccountEmail().isPresent()) { + throw new ConfigException("If auth_method is private_key, you have to set both service_account_email and p12_keyfile"); + } + } + + try { + auth = new GcsAuthentication( + task.getAuthMethod().getString(), + task.getServiceAccountEmail(), + task.getP12Keyfile().transform(localFileToPathString()), + task.getJsonKeyfile().transform(localFileToPathString()), + task.getApplicationName() + ); + } catch (GeneralSecurityException | IOException ex) { + throw new ConfigException(ex); + } + // list files recursively task.setFiles(listFiles(task)); // number of processors is same with number of files return resume(task.dump(), task.getFiles().size(), control); } @@ -115,22 +163,33 @@ int taskCount, List<TaskReport> successTaskReports) { } - private static Storage newGcsClient(final PluginTask task) { + private static Storage newGcsClient(final PluginTask task) + { Storage client = null; try { - GcsAuthentication auth = new GcsAuthentication(task.getAuthMethod().getString(), task.getServiceAccountEmail(), task.getP12KeyfileFullpath(), task.getApplicationName()); client = auth.getGcsClient(task.getBucket()); - } catch (GeneralSecurityException | IOException ex) { + } catch (IOException ex) { throw new ConfigException(ex); } return client; } + private Function<LocalFile, String> localFileToPathString() + { + return new Function<LocalFile, String>() + { + public String apply(LocalFile file) + { + return file.getPath().toString(); + } + }; + } + public List<String> listFiles(PluginTask task) { Storage client = newGcsClient(task); String bucket = task.getBucket(); @@ -257,10 +316,11 @@ } public enum AuthMethod { private_key("private_key"), - compute_engine("compute_engine"); + compute_engine("compute_engine"), + json_key("json_key"); private final String string; AuthMethod(String string) {