embulk-standards/src/main/java/org/embulk/standards/LocalFileOutputPlugin.java in embulk-0.3.2 vs embulk-standards/src/main/java/org/embulk/standards/LocalFileOutputPlugin.java in embulk-0.4.0

- old
+ new

@@ -6,12 +6,13 @@ import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; import java.util.List; import org.embulk.config.Config; +import org.embulk.config.ConfigDefault; import org.embulk.config.ConfigSource; -import org.embulk.config.NextConfig; +import org.embulk.config.ConfigDiff; import org.embulk.config.CommitReport; import org.embulk.config.Task; import org.embulk.config.TaskSource; import org.embulk.spi.Buffer; import org.embulk.spi.FileOutputPlugin; @@ -23,42 +24,42 @@ implements FileOutputPlugin { public interface PluginTask extends Task { - @Config("directory") - public String getDirectory(); + @Config("path_prefix") + public String getPathPrefix(); - @Config("file_name") - public String getFileNameFormat(); - @Config("file_ext") public String getFileNameExtension(); - // TODO support in FileInputPlugin and FileOutputPlugin - //@Config("compress_type") - //public String getCompressType(); + @Config("sequence_format") + @ConfigDefault("\".%03d.%02d\"") + public String getSequenceFormat(); } private final Logger log = Exec.getLogger(getClass()); @Override - public NextConfig transaction(ConfigSource config, int processorCount, + public ConfigDiff transaction(ConfigSource config, int processorCount, FileOutputPlugin.Control control) { PluginTask task = config.loadConfig(PluginTask.class); + // validate sequence_format + String.format(task.getSequenceFormat(), 0, 0); + return resume(task.dump(), processorCount, control); } @Override - public NextConfig resume(TaskSource taskSource, + public ConfigDiff resume(TaskSource taskSource, int processorCount, FileOutputPlugin.Control control) { control.run(taskSource); - return Exec.newNextConfig(); + return Exec.newConfigDiff(); } @Override public void cleanup(TaskSource taskSource, int processorCount, @@ -68,25 +69,23 @@ @Override public TransactionalFileOutput open(TaskSource taskSource, final int processorIndex) { PluginTask task = taskSource.loadTask(PluginTask.class); - // TODO format path using timestamp - final String fileName = task.getFileNameFormat(); - - final String pathPrefix = task.getDirectory() + File.separator + fileName; + final String pathPrefix = task.getPathPrefix(); final String pathSuffix = task.getFileNameExtension(); + final String sequenceFormat = task.getSequenceFormat(); final List<String> fileNames = new ArrayList<>(); return new TransactionalFileOutput() { private int fileIndex = 0; private FileOutputStream output = null; public void nextFile() { closeFile(); - String path = pathPrefix + String.format(".%03d.%02d.", processorIndex, fileIndex) + pathSuffix; + String path = pathPrefix + String.format(sequenceFormat, processorIndex, fileIndex) + pathSuffix; log.info("Writing local file '{}'", path); fileNames.add(path); try { output = new FileOutputStream(new File(path)); } catch (FileNotFoundException ex) {