src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.1.8 vs src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.1.9

- old
+ new

@@ -84,11 +84,10 @@ @ConfigInject BufferAllocator getBufferAllocator(); } private static final Logger log = Exec.getLogger(GcsFileInputPlugin.class); - private static GcsAuthentication auth; @Override public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control) { @@ -113,26 +112,31 @@ if (!task.getP12Keyfile().isPresent() || !task.getServiceAccountEmail().isPresent()) { throw new ConfigException("If auth_method is private_key, you have to set both service_account_email and p12_keyfile"); } } + Storage client = newGcsClient(task, newGcsAuth(task)); + + // list files recursively + task.setFiles(listFiles(task, client)); + // number of processors is same with number of files + return resume(task.dump(), task.getFiles().size(), control); + } + + private GcsAuthentication newGcsAuth(PluginTask task) + { try { - auth = new GcsAuthentication( + return new GcsAuthentication( task.getAuthMethod().getString(), task.getServiceAccountEmail(), task.getP12Keyfile().transform(localFileToPathString()), task.getJsonKeyfile().transform(localFileToPathString()), task.getApplicationName() ); } catch (GeneralSecurityException | IOException ex) { throw new ConfigException(ex); } - - // list files recursively - task.setFiles(listFiles(task)); - // number of processors is same with number of files - return resume(task.dump(), task.getFiles().size(), control); } @Override public ConfigDiff resume(TaskSource taskSource, int taskCount, @@ -163,11 +167,11 @@ int taskCount, List<TaskReport> successTaskReports) { } - protected Storage newGcsClient(final PluginTask task) + protected Storage newGcsClient(final PluginTask task, final GcsAuthentication auth) { Storage client = null; try { client = auth.getGcsClient(task.getBucket()); } catch (IOException ex) { @@ -186,13 +190,12 @@ return file.getPath().toString(); } }; } - public List<String> listFiles(PluginTask task) + public List<String> listFiles(PluginTask task, Storage client) { - Storage client = newGcsClient(task); String bucket = task.getBucket(); return listGcsFilesByPrefix(client, bucket, task.getPathPrefix(), task.getLastPath()); } @@ -289,10 +292,10 @@ private final String key; private boolean opened = false; public SingleFileProvider(PluginTask task, int taskIndex) { - this.client = newGcsClient(task); + this.client = newGcsClient(task, newGcsAuth(task)); this.bucket = task.getBucket(); this.key = task.getFiles().get(taskIndex); } @Override