src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.1.8 vs src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.1.9
- old
+ new
@@ -84,11 +84,10 @@
@ConfigInject
BufferAllocator getBufferAllocator();
}
private static final Logger log = Exec.getLogger(GcsFileInputPlugin.class);
- private static GcsAuthentication auth;
@Override
public ConfigDiff transaction(ConfigSource config,
FileInputPlugin.Control control)
{
@@ -113,26 +112,31 @@
if (!task.getP12Keyfile().isPresent() || !task.getServiceAccountEmail().isPresent()) {
throw new ConfigException("If auth_method is private_key, you have to set both service_account_email and p12_keyfile");
}
}
+ Storage client = newGcsClient(task, newGcsAuth(task));
+
+ // list files recursively
+ task.setFiles(listFiles(task, client));
+ // number of processors is same with number of files
+ return resume(task.dump(), task.getFiles().size(), control);
+ }
+
+ private GcsAuthentication newGcsAuth(PluginTask task)
+ {
try {
- auth = new GcsAuthentication(
+ return new GcsAuthentication(
task.getAuthMethod().getString(),
task.getServiceAccountEmail(),
task.getP12Keyfile().transform(localFileToPathString()),
task.getJsonKeyfile().transform(localFileToPathString()),
task.getApplicationName()
);
} catch (GeneralSecurityException | IOException ex) {
throw new ConfigException(ex);
}
-
- // list files recursively
- task.setFiles(listFiles(task));
- // number of processors is same with number of files
- return resume(task.dump(), task.getFiles().size(), control);
}
@Override
public ConfigDiff resume(TaskSource taskSource,
int taskCount,
@@ -163,11 +167,11 @@
int taskCount,
List<TaskReport> successTaskReports)
{
}
- protected Storage newGcsClient(final PluginTask task)
+ protected Storage newGcsClient(final PluginTask task, final GcsAuthentication auth)
{
Storage client = null;
try {
client = auth.getGcsClient(task.getBucket());
} catch (IOException ex) {
@@ -186,13 +190,12 @@
return file.getPath().toString();
}
};
}
- public List<String> listFiles(PluginTask task)
+ public List<String> listFiles(PluginTask task, Storage client)
{
- Storage client = newGcsClient(task);
String bucket = task.getBucket();
return listGcsFilesByPrefix(client, bucket, task.getPathPrefix(), task.getLastPath());
}
@@ -289,10 +292,10 @@
private final String key;
private boolean opened = false;
public SingleFileProvider(PluginTask task, int taskIndex)
{
- this.client = newGcsClient(task);
+ this.client = newGcsClient(task, newGcsAuth(task));
this.bucket = task.getBucket();
this.key = task.getFiles().get(taskIndex);
}
@Override