embulk-standards/src/main/java/org/embulk/standards/LocalFileOutputPlugin.java in embulk-0.3.2 vs embulk-standards/src/main/java/org/embulk/standards/LocalFileOutputPlugin.java in embulk-0.4.0
- old
+ new
@@ -6,12 +6,13 @@
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import org.embulk.config.Config;
+import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigSource;
-import org.embulk.config.NextConfig;
+import org.embulk.config.ConfigDiff;
import org.embulk.config.CommitReport;
import org.embulk.config.Task;
import org.embulk.config.TaskSource;
import org.embulk.spi.Buffer;
import org.embulk.spi.FileOutputPlugin;
@@ -23,42 +24,42 @@
implements FileOutputPlugin
{
public interface PluginTask
extends Task
{
- @Config("directory")
- public String getDirectory();
+ @Config("path_prefix")
+ public String getPathPrefix();
- @Config("file_name")
- public String getFileNameFormat();
-
@Config("file_ext")
public String getFileNameExtension();
- // TODO support in FileInputPlugin and FileOutputPlugin
- //@Config("compress_type")
- //public String getCompressType();
+ @Config("sequence_format")
+ @ConfigDefault("\".%03d.%02d\"")
+ public String getSequenceFormat();
}
private final Logger log = Exec.getLogger(getClass());
@Override
- public NextConfig transaction(ConfigSource config, int processorCount,
+ public ConfigDiff transaction(ConfigSource config, int processorCount,
FileOutputPlugin.Control control)
{
PluginTask task = config.loadConfig(PluginTask.class);
+ // validate sequence_format
+ String.format(task.getSequenceFormat(), 0, 0);
+
return resume(task.dump(), processorCount, control);
}
@Override
- public NextConfig resume(TaskSource taskSource,
+ public ConfigDiff resume(TaskSource taskSource,
int processorCount,
FileOutputPlugin.Control control)
{
control.run(taskSource);
- return Exec.newNextConfig();
+ return Exec.newConfigDiff();
}
@Override
public void cleanup(TaskSource taskSource,
int processorCount,
@@ -68,25 +69,23 @@
@Override
public TransactionalFileOutput open(TaskSource taskSource, final int processorIndex)
{
PluginTask task = taskSource.loadTask(PluginTask.class);
- // TODO format path using timestamp
- final String fileName = task.getFileNameFormat();
-
- final String pathPrefix = task.getDirectory() + File.separator + fileName;
+ final String pathPrefix = task.getPathPrefix();
final String pathSuffix = task.getFileNameExtension();
+ final String sequenceFormat = task.getSequenceFormat();
final List<String> fileNames = new ArrayList<>();
return new TransactionalFileOutput() {
private int fileIndex = 0;
private FileOutputStream output = null;
public void nextFile()
{
closeFile();
- String path = pathPrefix + String.format(".%03d.%02d.", processorIndex, fileIndex) + pathSuffix;
+ String path = pathPrefix + String.format(sequenceFormat, processorIndex, fileIndex) + pathSuffix;
log.info("Writing local file '{}'", path);
fileNames.add(path);
try {
output = new FileOutputStream(new File(path));
} catch (FileNotFoundException ex) {