src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.1.7 vs src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.1.8

- old
+ new

@@ -163,11 +163,11 @@ int taskCount, List<TaskReport> successTaskReports) { } - private static Storage newGcsClient(final PluginTask task) + protected Storage newGcsClient(final PluginTask task) { Storage client = null; try { client = auth.getGcsClient(task.getBucket()); } catch (IOException ex) { @@ -211,16 +211,15 @@ // @see https://cloud.google.com/storage/docs/json_api/v1/objects#resource try { Storage.Buckets.Get getBucket = client.buckets().get(bucket); getBucket.setProjection("full"); Bucket bk = getBucket.execute(); - if (log.isDebugEnabled()) { - log.debug("bucket name: " + bucket); - log.debug("bucket location: " + bk.getLocation()); - log.debug("bucket timeCreated: " + bk.getTimeCreated()); - log.debug("bucket owner: " + bk.getOwner()); - } + + log.debug("bucket name: " + bucket); + log.debug("bucket location: " + bk.getLocation()); + log.debug("bucket timeCreated: " + bk.getTimeCreated()); + log.debug("bucket owner: " + bk.getOwner()); } catch (IOException e) { log.warn("Could not access to bucket:" + bucket); log.warn(e.getMessage()); } @@ -236,17 +235,15 @@ if (items == null) { log.info(String.format("No file was found in bucket:%s prefix:%s", bucket, prefix)); break; } for (StorageObject o : items) { - if (log.isDebugEnabled()) { - log.debug("filename: " + o.getName()); - log.debug("updated: " + o.getUpdated()); - } if (o.getSize().compareTo(BigInteger.ZERO) > 0) { builder.add(o.getName()); } + log.debug("filename: " + o.getName()); + log.debug("updated: " + o.getUpdated()); } lastKey = objects.getNextPageToken(); listObjects.setPageToken(lastKey); } while (lastKey != null); } catch (IOException e) { @@ -262,54 +259,54 @@ { PluginTask task = taskSource.loadTask(PluginTask.class); return new GcsFileInput(task, taskIndex); } - public static class GcsFileInput + public class GcsFileInput extends InputStreamFileInput implements TransactionalFileInput { - private static class SingleFileProvider - implements InputStreamFileInput.Provider - { - private final Storage client; - private final String bucket; - private final String key; - private boolean opened = false; - - public SingleFileProvider(PluginTask task, int taskIndex) - { - this.client = newGcsClient(task); - this.bucket = task.getBucket(); - this.key = task.getFiles().get(taskIndex); - } - - @Override - public InputStream openNext() throws IOException - { - if (opened) { - return null; - } - opened = true; - Storage.Objects.Get getObject = client.objects().get(bucket, key); - - return getObject.executeMediaAsInputStream(); - } - - @Override - public void close() { } - } - public GcsFileInput(PluginTask task, int taskIndex) { super(task.getBufferAllocator(), new SingleFileProvider(task, taskIndex)); } public void abort() { } public TaskReport commit() { return Exec.newTaskReport(); + } + + @Override + public void close() { } + } + + private class SingleFileProvider + implements InputStreamFileInput.Provider + { + private final Storage client; + private final String bucket; + private final String key; + private boolean opened = false; + + public SingleFileProvider(PluginTask task, int taskIndex) + { + this.client = newGcsClient(task); + this.bucket = task.getBucket(); + this.key = task.getFiles().get(taskIndex); + } + + @Override + public InputStream openNext() throws IOException + { + if (opened) { + return null; + } + opened = true; + Storage.Objects.Get getObject = client.objects().get(bucket, key); + + return getObject.executeMediaAsInputStream(); } @Override public void close() { } }