src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.1.3 vs src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.1.4
- old
+ new
@@ -79,11 +79,11 @@
@Override
public ConfigDiff transaction(ConfigSource config,
FileInputPlugin.Control control)
{
- final PluginTask task = config.loadConfig(PluginTask.class);
+ PluginTask task = config.loadConfig(PluginTask.class);
try {
httpTransport = GoogleNetHttpTransport.newTrustedTransport();
jsonFactory = new JacksonFactory();
} catch (Exception e) {
@@ -99,21 +99,28 @@
@Override
public ConfigDiff resume(TaskSource taskSource,
int taskCount,
FileInputPlugin.Control control)
{
- final PluginTask task = taskSource.loadTask(PluginTask.class);
+ PluginTask task = taskSource.loadTask(PluginTask.class);
control.run(taskSource, taskCount);
+ ConfigDiff configDiff = Exec.newConfigDiff();
+
List<String> files = new ArrayList<String>(task.getFiles());
- if (files.size() == 0) {
- return null;
+ if (files.isEmpty()) {
+ // keep the last value if any
+ if (task.getLastPath().isPresent()) {
+ configDiff.set("last_path", task.getLastPath().get());
+ }
+ } else {
+ Collections.sort(files);
+ configDiff.set("last_path", files.get(files.size() - 1));
}
- Collections.sort(files);
- return Exec.newConfigDiff().
- set("last_path", files.get(files.size() - 1));
+
+ return configDiff;
}
@Override
public void cleanup(TaskSource taskSource,
int taskCount,
@@ -187,11 +194,11 @@
log.debug("bucket name: " + bucket);
log.debug("bucket location: " + bk.getLocation());
log.debug("bucket timeCreated: " + bk.getTimeCreated());
log.debug("bucket owner: " + bk.getOwner());
}
- } catch (Exception e) {
+ } catch (IOException e) {
log.warn("Could not access to bucket:" + bucket);
log.warn(e.getMessage());
}
@@ -217,21 +224,21 @@
}
}
lastKey = objects.getNextPageToken();
listObjects.setPageToken(lastKey);
} while (lastKey != null);
- } catch (Exception e) {
+ } catch (IOException e) {
log.warn(String.format("Could not get file list from bucket:%s", bucket));
log.warn(e.getMessage());
}
return builder.build();
}
@Override
public TransactionalFileInput open(TaskSource taskSource, int taskIndex)
{
- final PluginTask task = taskSource.loadTask(PluginTask.class);
+ PluginTask task = taskSource.loadTask(PluginTask.class);
return new GcsFileInput(task, taskIndex);
}
public static class GcsFileInput
extends InputStreamFileInput