package org.embulk.spi.util; import java.util.List; import com.google.common.collect.ImmutableList; import org.embulk.config.TaskSource; import org.embulk.config.ConfigSource; import org.embulk.config.CommitReport; import org.embulk.config.ConfigDiff; import org.embulk.plugin.PluginType; import org.embulk.spi.ExecSession; import org.embulk.spi.Schema; import org.embulk.spi.PageOutput; import org.embulk.spi.FilterPlugin; public abstract class Filters { private Filters() { } public static List getPluginTypes(List configs) { ImmutableList.Builder builder = ImmutableList.builder(); for (ConfigSource config : configs) { builder.add(config.get(PluginType.class, "type")); } return builder.build(); } public static List newFilterPluginsFromConfigSources(ExecSession exec, List configs) { return newFilterPlugins(exec, getPluginTypes(configs)); } public static List newFilterPlugins(ExecSession exec, List pluginTypes) { ImmutableList.Builder builder = ImmutableList.builder(); for (PluginType pluginType : pluginTypes) { builder.add(exec.newPlugin(FilterPlugin.class, pluginType)); } return builder.build(); } public interface Control { public void run(List taskSources, List filterSchemas); } public static void transaction(List plugins, List configs, Schema inputSchema, Filters.Control control) { new RecursiveControl(plugins, configs, control).transaction(inputSchema); } public static PageOutput open(List plugins, List taskSources, List filterSchemas, PageOutput output) { PageOutput out = output; int pos = 0; while (pos < plugins.size()) { out = plugins.get(pos).open(taskSources.get(pos), filterSchemas.get(pos), filterSchemas.get(pos + 1), out); pos++; } return out; } private static class RecursiveControl { private final List plugins; private final List configs; private final Filters.Control finalControl; private final ImmutableList.Builder taskSources; private final ImmutableList.Builder filterSchemas; private int pos; RecursiveControl(List plugins, List configs, Filters.Control finalControl) { this.plugins = plugins; this.configs = configs; this.finalControl = finalControl; this.taskSources = ImmutableList.builder(); this.filterSchemas = ImmutableList.builder(); } public void transaction(Schema inputSchema) { filterSchemas.add(inputSchema); if (pos < plugins.size()) { plugins.get(pos).transaction(configs.get(pos), inputSchema, new FilterPlugin.Control() { public void run(TaskSource taskSource, Schema outputSchema) { taskSources.add(taskSource); pos++; transaction(outputSchema); } }); } else { finalControl.run(taskSources.build(), filterSchemas.build()); } } } }