src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.3.0 vs src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.3.1
- old
+ new
@@ -1,31 +1,25 @@
package org.embulk.input.gcs;
-import com.google.api.services.storage.Storage;
import com.google.common.base.Throwables;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigException;
import org.embulk.config.ConfigSource;
import org.embulk.config.TaskReport;
import org.embulk.config.TaskSource;
import org.embulk.spi.Exec;
import org.embulk.spi.FileInputPlugin;
import org.embulk.spi.TransactionalFileInput;
import org.embulk.spi.unit.LocalFile;
-import org.slf4j.Logger;
import java.io.IOException;
-import java.security.GeneralSecurityException;
import java.util.List;
import java.util.Optional;
-import java.util.function.Function;
public class GcsFileInputPlugin
implements FileInputPlugin
{
- private static final Logger log = Exec.getLogger(GcsFileInputPlugin.class);
-
@Override
public ConfigDiff transaction(ConfigSource config,
FileInputPlugin.Control control)
{
PluginTask task = config.loadConfig(PluginTask.class);
@@ -40,16 +34,16 @@
catch (IOException ex) {
throw Throwables.propagate(ex);
}
}
- if (task.getAuthMethod().getString().equals("json_key")) {
+ if (AuthUtils.AuthMethod.json_key.equals(task.getAuthMethod())) {
if (!task.getJsonKeyfile().isPresent()) {
throw new ConfigException("If auth_method is json_key, you have to set json_keyfile");
}
}
- else if (task.getAuthMethod().getString().equals("private_key")) {
+ else if (AuthUtils.AuthMethod.private_key.equals(task.getAuthMethod())) {
if (!task.getP12Keyfile().isPresent() || !task.getServiceAccountEmail().isPresent()) {
throw new ConfigException("If auth_method is private_key, you have to set both service_account_email and p12_keyfile");
}
}
@@ -58,15 +52,13 @@
if (task.getLastPath().get().length() >= 128) {
throw new ConfigException("last_path length is allowed between 1 and 1024 bytes");
}
}
- Storage client = GcsFileInput.newGcsClient(task, newGcsAuth(task));
-
// list files recursively if path_prefix is specified
if (task.getPathPrefix().isPresent()) {
- task.setFiles(GcsFileInput.listFiles(task, client));
+ task.setFiles(GcsFileInput.listFiles(task));
}
else {
if (task.getPathFiles().isEmpty()) {
throw new ConfigException("No file is found. Confirm paths option isn't empty");
}
@@ -78,26 +70,10 @@
}
// number of processors is same with number of files
return resume(task.dump(), task.getFiles().getTaskCount(), control);
}
- private GcsAuthentication newGcsAuth(PluginTask task)
- {
- try {
- return new GcsAuthentication(
- task.getAuthMethod().getString(),
- task.getServiceAccountEmail(),
- task.getP12Keyfile().map(localFileToPathString()),
- task.getJsonKeyfile().map(localFileToPathString()),
- task.getApplicationName()
- );
- }
- catch (GeneralSecurityException | IOException ex) {
- throw new ConfigException(ex);
- }
- }
-
@Override
public ConfigDiff resume(TaskSource taskSource,
int taskCount,
FileInputPlugin.Control control)
{
@@ -117,20 +93,9 @@
@Override
public void cleanup(TaskSource taskSource,
int taskCount,
List<TaskReport> successTaskReports)
{
- }
-
- private Function<LocalFile, String> localFileToPathString()
- {
- return new Function<LocalFile, String>()
- {
- public String apply(LocalFile file)
- {
- return file.getPath().toString();
- }
- };
}
@Override
public TransactionalFileInput open(TaskSource taskSource, int taskIndex)
{