src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.1.7 vs src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.1.8
- old
+ new
@@ -163,11 +163,11 @@
int taskCount,
List<TaskReport> successTaskReports)
{
}
- private static Storage newGcsClient(final PluginTask task)
+ protected Storage newGcsClient(final PluginTask task)
{
Storage client = null;
try {
client = auth.getGcsClient(task.getBucket());
} catch (IOException ex) {
@@ -211,16 +211,15 @@
// @see https://cloud.google.com/storage/docs/json_api/v1/objects#resource
try {
Storage.Buckets.Get getBucket = client.buckets().get(bucket);
getBucket.setProjection("full");
Bucket bk = getBucket.execute();
- if (log.isDebugEnabled()) {
- log.debug("bucket name: " + bucket);
- log.debug("bucket location: " + bk.getLocation());
- log.debug("bucket timeCreated: " + bk.getTimeCreated());
- log.debug("bucket owner: " + bk.getOwner());
- }
+
+ log.debug("bucket name: " + bucket);
+ log.debug("bucket location: " + bk.getLocation());
+ log.debug("bucket timeCreated: " + bk.getTimeCreated());
+ log.debug("bucket owner: " + bk.getOwner());
} catch (IOException e) {
log.warn("Could not access to bucket:" + bucket);
log.warn(e.getMessage());
}
@@ -236,17 +235,15 @@
if (items == null) {
log.info(String.format("No file was found in bucket:%s prefix:%s", bucket, prefix));
break;
}
for (StorageObject o : items) {
- if (log.isDebugEnabled()) {
- log.debug("filename: " + o.getName());
- log.debug("updated: " + o.getUpdated());
- }
if (o.getSize().compareTo(BigInteger.ZERO) > 0) {
builder.add(o.getName());
}
+ log.debug("filename: " + o.getName());
+ log.debug("updated: " + o.getUpdated());
}
lastKey = objects.getNextPageToken();
listObjects.setPageToken(lastKey);
} while (lastKey != null);
} catch (IOException e) {
@@ -262,54 +259,54 @@
{
PluginTask task = taskSource.loadTask(PluginTask.class);
return new GcsFileInput(task, taskIndex);
}
- public static class GcsFileInput
+ public class GcsFileInput
extends InputStreamFileInput
implements TransactionalFileInput
{
- private static class SingleFileProvider
- implements InputStreamFileInput.Provider
- {
- private final Storage client;
- private final String bucket;
- private final String key;
- private boolean opened = false;
-
- public SingleFileProvider(PluginTask task, int taskIndex)
- {
- this.client = newGcsClient(task);
- this.bucket = task.getBucket();
- this.key = task.getFiles().get(taskIndex);
- }
-
- @Override
- public InputStream openNext() throws IOException
- {
- if (opened) {
- return null;
- }
- opened = true;
- Storage.Objects.Get getObject = client.objects().get(bucket, key);
-
- return getObject.executeMediaAsInputStream();
- }
-
- @Override
- public void close() { }
- }
-
public GcsFileInput(PluginTask task, int taskIndex)
{
super(task.getBufferAllocator(), new SingleFileProvider(task, taskIndex));
}
public void abort() { }
public TaskReport commit()
{
return Exec.newTaskReport();
+ }
+
+ @Override
+ public void close() { }
+ }
+
+ private class SingleFileProvider
+ implements InputStreamFileInput.Provider
+ {
+ private final Storage client;
+ private final String bucket;
+ private final String key;
+ private boolean opened = false;
+
+ public SingleFileProvider(PluginTask task, int taskIndex)
+ {
+ this.client = newGcsClient(task);
+ this.bucket = task.getBucket();
+ this.key = task.getFiles().get(taskIndex);
+ }
+
+ @Override
+ public InputStream openNext() throws IOException
+ {
+ if (opened) {
+ return null;
+ }
+ opened = true;
+ Storage.Objects.Get getObject = client.objects().get(bucket, key);
+
+ return getObject.executeMediaAsInputStream();
}
@Override
public void close() { }
}