src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.1.3 vs src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.1.4

- old
+ new

@@ -79,11 +79,11 @@ @Override public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control) { - final PluginTask task = config.loadConfig(PluginTask.class); + PluginTask task = config.loadConfig(PluginTask.class); try { httpTransport = GoogleNetHttpTransport.newTrustedTransport(); jsonFactory = new JacksonFactory(); } catch (Exception e) { @@ -99,21 +99,28 @@ @Override public ConfigDiff resume(TaskSource taskSource, int taskCount, FileInputPlugin.Control control) { - final PluginTask task = taskSource.loadTask(PluginTask.class); + PluginTask task = taskSource.loadTask(PluginTask.class); control.run(taskSource, taskCount); + ConfigDiff configDiff = Exec.newConfigDiff(); + List<String> files = new ArrayList<String>(task.getFiles()); - if (files.size() == 0) { - return null; + if (files.isEmpty()) { + // keep the last value if any + if (task.getLastPath().isPresent()) { + configDiff.set("last_path", task.getLastPath().get()); + } + } else { + Collections.sort(files); + configDiff.set("last_path", files.get(files.size() - 1)); } - Collections.sort(files); - return Exec.newConfigDiff(). - set("last_path", files.get(files.size() - 1)); + + return configDiff; } @Override public void cleanup(TaskSource taskSource, int taskCount, @@ -187,11 +194,11 @@ log.debug("bucket name: " + bucket); log.debug("bucket location: " + bk.getLocation()); log.debug("bucket timeCreated: " + bk.getTimeCreated()); log.debug("bucket owner: " + bk.getOwner()); } - } catch (Exception e) { + } catch (IOException e) { log.warn("Could not access to bucket:" + bucket); log.warn(e.getMessage()); } @@ -217,21 +224,21 @@ } } lastKey = objects.getNextPageToken(); listObjects.setPageToken(lastKey); } while (lastKey != null); - } catch (Exception e) { + } catch (IOException e) { log.warn(String.format("Could not get file list from bucket:%s", bucket)); log.warn(e.getMessage()); } return builder.build(); } @Override public TransactionalFileInput open(TaskSource taskSource, int taskIndex) { - final PluginTask task = taskSource.loadTask(PluginTask.class); + PluginTask task = taskSource.loadTask(PluginTask.class); return new GcsFileInput(task, taskIndex); } public static class GcsFileInput extends InputStreamFileInput