src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.3.0 vs src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.3.1

- old
+ new

@@ -1,31 +1,25 @@ package org.embulk.input.gcs; -import com.google.api.services.storage.Storage; import com.google.common.base.Throwables; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigException; import org.embulk.config.ConfigSource; import org.embulk.config.TaskReport; import org.embulk.config.TaskSource; import org.embulk.spi.Exec; import org.embulk.spi.FileInputPlugin; import org.embulk.spi.TransactionalFileInput; import org.embulk.spi.unit.LocalFile; -import org.slf4j.Logger; import java.io.IOException; -import java.security.GeneralSecurityException; import java.util.List; import java.util.Optional; -import java.util.function.Function; public class GcsFileInputPlugin implements FileInputPlugin { - private static final Logger log = Exec.getLogger(GcsFileInputPlugin.class); - @Override public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control) { PluginTask task = config.loadConfig(PluginTask.class); @@ -40,16 +34,16 @@ catch (IOException ex) { throw Throwables.propagate(ex); } } - if (task.getAuthMethod().getString().equals("json_key")) { + if (AuthUtils.AuthMethod.json_key.equals(task.getAuthMethod())) { if (!task.getJsonKeyfile().isPresent()) { throw new ConfigException("If auth_method is json_key, you have to set json_keyfile"); } } - else if (task.getAuthMethod().getString().equals("private_key")) { + else if (AuthUtils.AuthMethod.private_key.equals(task.getAuthMethod())) { if (!task.getP12Keyfile().isPresent() || !task.getServiceAccountEmail().isPresent()) { throw new ConfigException("If auth_method is private_key, you have to set both service_account_email and p12_keyfile"); } } @@ -58,15 +52,13 @@ if (task.getLastPath().get().length() >= 128) { throw new ConfigException("last_path length is allowed between 1 and 1024 bytes"); } } - Storage client = GcsFileInput.newGcsClient(task, newGcsAuth(task)); - // list files recursively if path_prefix is specified if (task.getPathPrefix().isPresent()) { - task.setFiles(GcsFileInput.listFiles(task, client)); + task.setFiles(GcsFileInput.listFiles(task)); } else { if (task.getPathFiles().isEmpty()) { throw new ConfigException("No file is found. Confirm paths option isn't empty"); } @@ -78,26 +70,10 @@ } // number of processors is same with number of files return resume(task.dump(), task.getFiles().getTaskCount(), control); } - private GcsAuthentication newGcsAuth(PluginTask task) - { - try { - return new GcsAuthentication( - task.getAuthMethod().getString(), - task.getServiceAccountEmail(), - task.getP12Keyfile().map(localFileToPathString()), - task.getJsonKeyfile().map(localFileToPathString()), - task.getApplicationName() - ); - } - catch (GeneralSecurityException | IOException ex) { - throw new ConfigException(ex); - } - } - @Override public ConfigDiff resume(TaskSource taskSource, int taskCount, FileInputPlugin.Control control) { @@ -117,20 +93,9 @@ @Override public void cleanup(TaskSource taskSource, int taskCount, List<TaskReport> successTaskReports) { - } - - private Function<LocalFile, String> localFileToPathString() - { - return new Function<LocalFile, String>() - { - public String apply(LocalFile file) - { - return file.getPath().toString(); - } - }; } @Override public TransactionalFileInput open(TaskSource taskSource, int taskIndex) {