package org.embulk.spi; import java.util.List; import com.google.common.base.Function; import com.google.common.collect.Lists; import com.google.common.collect.ImmutableList; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonIgnore; import org.embulk.plugin.PluginType; import org.embulk.config.TaskSource; import org.embulk.spi.Schema; import org.embulk.spi.util.Executors; import org.embulk.spi.type.TimestampType; public class ProcessTask { private final PluginType inputPluginType; private final PluginType outputPluginType; private final List filterPluginTypes; private final TaskSource inputTaskSource; private final TaskSource outputTaskSource; private final List filterTaskSources; private final List schemas; private final Schema executorSchema; private TaskSource executorTaskSource; public ProcessTask( PluginType inputPluginType, PluginType outputPluginType, List filterPluginTypes, TaskSource inputTaskSource, TaskSource outputTaskSource, List filterTaskSources, List schemas, Schema executorSchema, TaskSource executorTaskSource) { this.inputPluginType = inputPluginType; this.outputPluginType = outputPluginType; this.filterPluginTypes = filterPluginTypes; this.inputTaskSource = inputTaskSource; this.outputTaskSource = outputTaskSource; this.filterTaskSources = filterTaskSources; this.schemas = schemas; this.executorSchema = executorSchema; this.executorTaskSource = executorTaskSource; } // TODO Because TimestampType doesn't store timestamp_format, serializing and deserializing // Schema loses timestamp_format information. Here uses SchemaConfig instead to preseve it. @JsonCreator ProcessTask( @JsonProperty("inputType") PluginType inputPluginType, @JsonProperty("outputType") PluginType outputPluginType, @JsonProperty("filterTypes") List filterPluginTypes, @JsonProperty("inputTask") TaskSource inputTaskSource, @JsonProperty("outputTask") TaskSource outputTaskSource, @JsonProperty("filterTasks") List filterTaskSources, @JsonProperty("schemas") List schemas, @JsonProperty("executorSchema") SchemaConfig executorSchema, @JsonProperty("executorTask") TaskSource executorTaskSource) { this(inputPluginType, outputPluginType, filterPluginTypes, inputTaskSource, outputTaskSource, filterTaskSources, ImmutableList.copyOf(Lists.transform(schemas, new Function() { public Schema apply(SchemaConfig s) { return s.toSchema(); } } )), executorSchema.toSchema(), executorTaskSource); } @JsonProperty("inputType") public PluginType getInputPluginType() { return inputPluginType; } @JsonProperty("outputType") public PluginType getOutputPluginType() { return outputPluginType; } @JsonProperty("filterTypes") public List getFilterPluginTypes() { return filterPluginTypes; } @JsonProperty("inputTask") public TaskSource getInputTaskSource() { return inputTaskSource; } @JsonProperty("outputTask") public TaskSource getOutputTaskSource() { return outputTaskSource; } @JsonProperty("filterTasks") public List getFilterTaskSources() { return filterTaskSources; } @JsonIgnore public List getFilterSchemas() { return schemas; } @JsonProperty("schemas") public List getFilterSchemaConfigs() { return Lists.transform(schemas, new Function() { public SchemaConfig apply(Schema schema) { return schemaToSchemaConfig(schema); } }); } @JsonIgnore public Schema getExecutorSchema() { return executorSchema; } @JsonProperty("executorSchema") SchemaConfig getExecutorSchemaConfig() { return schemaToSchemaConfig(executorSchema); } private static SchemaConfig schemaToSchemaConfig(Schema s) { return new SchemaConfig(Lists.transform(s.getColumns(), new Function() { public ColumnConfig apply(Column c) { if (c.getType() instanceof TimestampType) { return new ColumnConfig(c.getName(), c.getType(), ((TimestampType) c.getType()).getFormat()); } else { return new ColumnConfig(c.getName(), c.getType(), null); } } } )); } @JsonIgnore public Schema getInputSchema() { return Executors.getInputSchema(schemas); } @JsonIgnore public Schema getOutputSchema() { return Executors.getOutputSchema(schemas); } @JsonIgnore public void setExecutorTaskSource(TaskSource executorTaskSource) { this.executorTaskSource = executorTaskSource; } @JsonProperty("executorTask") public TaskSource getExecutorTaskSource() { return executorTaskSource; } }