package <%= java_package_name %>; import java.util.List; import java.util.ArrayList; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import org.embulk.config.TaskReport; import org.embulk.config.Config; import org.embulk.config.ConfigDefault; import org.embulk.config.ConfigInject; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigSource; import org.embulk.config.Task; import org.embulk.config.TaskSource; import org.embulk.spi.Exec; import org.embulk.spi.FileInputPlugin; import org.embulk.spi.BufferAllocator; import org.embulk.spi.TransactionalFileInput; import org.embulk.spi.util.InputStreamTransactionalFileInput; public class <%= java_class_name %> implements FileInputPlugin { public interface PluginTask extends Task { // configuration option 1 (required integer) @Config("option1") public int getOption1(); // configuration option 2 (optional string, null is not allowed) @Config("optoin2") @ConfigDefault("\"myvalue\"") public String getOption2(); // configuration option 3 (optional string, null is allowed) @Config("optoin3") @ConfigDefault("null") public Optional getOption3(); //@Config("path_prefix") //public String getPathPrefix(); //@Config("last_path") //@ConfigDefault("null") //public Optional getLastPath(); // usually, you store list of files in task to pass them from transaction() to run(). //public List getFiles(); //public void setFiles(List files); @ConfigInject public BufferAllocator getBufferAllocator(); } @Override public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control) { PluginTask task = config.loadConfig(PluginTask.class); // run() method is called for this number of times in parallel. int taskCount = 1; // usually, taskCount is number of input files. //task.setFiles(listFiles(task)); //int taskCount = task.getFiles().size(); return resume(task.dump(), taskCount, control); } // usually, you have an method to create list of files //List listFiles(PluginTask task) //{ // final ImmutableList.Builder builder = ImmutableList.builder(); // for (String path : listFilesWithPrefix(task.getPathPrefix())) { // if (task.getLastPath().isPresent() && path.compareTo(task.getLastPath().get())) { // continue; // } // builder.add(path); // } // return builder.build(); //} @Override public ConfigDiff resume(TaskSource taskSource, int taskCount, FileInputPlugin.Control control) { control.run(taskSource, taskCount); ConfigDiff configDiff = Exec.newConfigDiff(); // usually, yo uset last_path //if (task.getFiles().isEmpty()) { // if (task.getLastPath().isPresent()) { // configDiff.set("last_path", task.getLastPath().get()); // } //} else { // List files = new ArrayList(task.getFiles()); // Collections.sort(files); // configDiff.set("last_path", files.get(files.size() - 1)); //} return configDiff; } @Override public void cleanup(TaskSource taskSource, int taskCount, List successTaskReports) { } @Override public TransactionalFileInput open(TaskSource taskSource, int taskIndex) { final PluginTask task = taskSource.loadTask(PluginTask.class); // Write your code here :) throw new UnsupportedOperationException("<%= java_class_name %>.open method is not implemented yet"); // if you expect InputStream, you can use this code: //InputStream input = openInputStream(task, task.getFiles().get(taskIndex)); // //return new InputStreamTransactionalFileInput(task.getBufferAllocator(), input) { // @Override // public void abort() // { } // // @Override // public TaskReport commit() // { // return Exec.newTaskReport(); // } //} } //private static InputStream openInputStream(PluginTask task, String path) //{ // return new MyInputStream(file); //} }