package org.embulk.filter.column; import io.github.medjed.jsonpathcompiler.InvalidPathException; import io.github.medjed.jsonpathcompiler.expressions.Path; import io.github.medjed.jsonpathcompiler.expressions.path.ArrayPathToken; import io.github.medjed.jsonpathcompiler.expressions.path.PathCompiler; import io.github.medjed.jsonpathcompiler.expressions.path.PathToken; import io.github.medjed.jsonpathcompiler.expressions.path.WildcardPathToken; import org.embulk.config.ConfigException; import org.embulk.filter.column.ColumnFilterPlugin.ColumnConfig; import org.embulk.filter.column.ColumnFilterPlugin.PluginTask; import org.embulk.spi.Exec; import org.embulk.spi.Schema; import org.embulk.spi.SchemaConfigException; import org.embulk.spi.type.BooleanType; import org.embulk.spi.type.DoubleType; import org.embulk.spi.type.JsonType; import org.embulk.spi.type.LongType; import org.embulk.spi.type.StringType; import org.embulk.spi.type.TimestampType; import org.embulk.spi.type.Type; import org.embulk.spi.type.Types; import org.msgpack.value.ArrayValue; import org.msgpack.value.MapValue; import org.msgpack.value.Value; import org.msgpack.value.ValueFactory; import org.slf4j.Logger; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; public class JsonVisitor { static final Logger logger = Exec.getLogger(ColumnFilterPlugin.class); final PluginTask task; final Schema inputSchema; final Schema outputSchema; // jsonpath final HashSet shouldVisitSet = new HashSet<>(); // parent jsonpath => { jsonpath => json column } final HashMap> jsonColumns = new HashMap<>(); // parent jsonpath => { jsonpath => json column } final HashMap> jsonAddColumns = new HashMap<>(); // parent jsonpath => [ jsonpath ] final HashMap> jsonDropColumns = new HashMap<>(); JsonVisitor(PluginTask task, Schema inputSchema, Schema outputSchema) { this.task = task; this.inputSchema = inputSchema; this.outputSchema = outputSchema; buildShouldVisitSet(); buildJsonSchema(); } static Value getDefault(PluginTask task, String name, Type type, ColumnConfig columnConfig) { Object defaultValue = ColumnVisitorImpl.getDefault(task, name, type, columnConfig); if (defaultValue == null) { return ValueFactory.newNil(); } if (type instanceof BooleanType) { return ValueFactory.newBoolean((Boolean) defaultValue); } else if (type instanceof LongType) { return ValueFactory.newInteger((Long) defaultValue); } else if (type instanceof DoubleType) { return ValueFactory.newFloat((Double) defaultValue); } else if (type instanceof StringType) { return ValueFactory.newString((String) defaultValue.toString()); } else if (type instanceof JsonType) { return (Value) defaultValue; } else if (type instanceof TimestampType) { throw new ConfigException("type: timestamp is not available in json path"); } else { throw new ConfigException(String.format("type: '%s' is not supported", type)); } } private void jsonColumnsPut(String path, JsonColumn value) { Path compiledPath = PathCompiler.compile(path); String parentPath = compiledPath.getParentPath(); if (! jsonColumns.containsKey(parentPath)) { jsonColumns.put(parentPath, new LinkedHashMap()); } jsonColumns.get(parentPath).put(compiledPath.toString(), value); } private boolean jsonColumnsContainsKey(String path) { Path compiledPath = PathCompiler.compile(path); String parentPath = compiledPath.getParentPath(); if (jsonColumns.containsKey(parentPath)) { return jsonColumns.get(parentPath).containsKey(compiledPath.toString()); } else { return false; } } private void jsonAddColumnsPut(String path, JsonColumn value) { Path compiledPath = PathCompiler.compile(path); String parentPath = compiledPath.getParentPath(); if (! jsonAddColumns.containsKey(parentPath)) { jsonAddColumns.put(parentPath, new LinkedHashMap()); } jsonAddColumns.get(parentPath).put(compiledPath.toString(), value); } private boolean jsonAddColumnsContainsKey(String path) { Path compiledPath = PathCompiler.compile(path); String parentPath = compiledPath.getParentPath(); if (jsonAddColumns.containsKey(parentPath)) { return jsonAddColumns.get(parentPath).containsKey(compiledPath.toString()); } else { return false; } } private void jsonDropColumnsPut(String path) { Path compiledPath = PathCompiler.compile(path); String parentPath = compiledPath.getParentPath(); if (! jsonDropColumns.containsKey(parentPath)) { jsonDropColumns.put(parentPath, new HashSet()); } jsonDropColumns.get(parentPath).add(compiledPath.toString()); } private void buildJsonColumns() { List columns = task.getColumns(); for (ColumnConfig column : columns) { String name = column.getName(); // skip NON json path notation to build output schema if (! PathCompiler.isProbablyJsonPath(name)) { continue; } JsonPathUtil.assertDoNotEndsWithArrayWildcard(name); // automatically fill ancestor jsonpaths for (JsonColumn ancestorJsonColumn : getAncestorJsonColumnList(name)) { String ancestorJsonPath = ancestorJsonColumn.getPath(); if (!jsonColumnsContainsKey(ancestorJsonPath)) { jsonColumnsPut(ancestorJsonPath, ancestorJsonColumn); } } // leaf jsonpath if (column.getSrc().isPresent()) { String src = column.getSrc().get(); jsonColumnsPut(name, new JsonColumn(name, null, null, src)); } else if (column.getType().isPresent() && column.getDefault().isPresent()) { // add column Type type = column.getType().get(); Value defaultValue = getDefault(task, name, type, column); jsonColumnsPut(name, new JsonColumn(name, type, defaultValue)); } else { Type type = column.getType().isPresent() ? column.getType().get() : null; jsonColumnsPut(name, new JsonColumn(name, type)); } } } private void buildJsonAddColumns() { List addColumns = task.getAddColumns(); for (ColumnConfig column : addColumns) { String name = column.getName(); // skip NON json path notation to build output schema if (! PathCompiler.isProbablyJsonPath(name)) { continue; } JsonPathUtil.assertDoNotEndsWithArrayWildcard(name); // automatically fill ancestor jsonpaths for (JsonColumn ancestorJsonColumn : getAncestorJsonColumnList(name)) { String ancestorJsonPath = ancestorJsonColumn.getPath(); if (!jsonAddColumnsContainsKey(ancestorJsonPath)) { jsonAddColumnsPut(ancestorJsonPath, ancestorJsonColumn); } } // leaf jsonpath if (column.getSrc().isPresent()) { String src = column.getSrc().get(); jsonAddColumnsPut(name, new JsonColumn(name, null, null, src)); } else if (column.getType().isPresent() && column.getDefault().isPresent()) { // add column Type type = column.getType().get(); Value defaultValue = getDefault(task, name, type, column); jsonAddColumnsPut(name, new JsonColumn(name, type, defaultValue)); } else { throw new SchemaConfigException(String.format("add_columns: Column '%s' does not have \"src\", or \"type\" and \"default\"", name)); } } } private void buildJsonDropColumns() { List dropColumns = task.getDropColumns(); for (ColumnConfig dropColumn : dropColumns) { String name = dropColumn.getName(); // skip NON json path notation to build output schema if (! PathCompiler.isProbablyJsonPath(name)) { continue; } jsonDropColumnsPut(name); } } // build jsonColumns, jsonAddColumns, and jsonDropColumns private void buildJsonSchema() { if (task.getDropColumns().size() > 0) { buildJsonDropColumns(); } else if (task.getColumns().size() > 0) { buildJsonColumns(); } // Add columns to last. If you want to add to head or middle, you can use `columns` option if (task.getAddColumns().size() > 0) { buildJsonAddColumns(); } } // json partial path => Boolean to avoid unnecessary type: json visit private void buildShouldVisitSet() { ArrayList columnConfigs = new ArrayList<>(task.getColumns()); columnConfigs.addAll(task.getAddColumns()); columnConfigs.addAll(task.getDropColumns()); for (ColumnConfig columnConfig : columnConfigs) { String name = columnConfig.getName(); if (!PathCompiler.isProbablyJsonPath(name)) { continue; } JsonPathUtil.assertJsonPathFormat(name); for (JsonColumn ancestorJsonColumn : getAncestorJsonColumnList(name)) { this.shouldVisitSet.add(ancestorJsonColumn.getPath()); } Path path = PathCompiler.compile(name); this.shouldVisitSet.add(path.toString()); } } /* *
     * $['foo']['bar'][0]['baz']
     * #=>
     * name: $['foo'], type: json, default: {}
     * name: $['foo']['bar'], type: json, default: []
     * name: $['foo']['bar'][0], type: json, default: {}
* * @return ancestors as an array */ public static ArrayList getAncestorJsonColumnList(String path) { ArrayList ancestorJsonColumnList = new ArrayList<>(); Path compiledPath; try { compiledPath = PathCompiler.compile(path); } catch (InvalidPathException e) { throw new ConfigException(String.format("jsonpath %s, %s", path, e.getMessage())); } StringBuilder partialPath = new StringBuilder("$"); PathToken parts = compiledPath.getRoot(); parts =; // skip "$" while (! parts.isLeaf()) { partialPath.append(parts.getPathFragment()); PathToken next =; JsonColumn jsonColumn; if (next instanceof ArrayPathToken || next instanceof WildcardPathToken) { jsonColumn = new JsonColumn(partialPath.toString(), Types.JSON, ValueFactory.newArray(new Value[0], false)); } else { jsonColumn = new JsonColumn(partialPath.toString(), Types.JSON, ValueFactory.newMap(new Value[0])); } ancestorJsonColumnList.add(jsonColumn); parts = next; } return ancestorJsonColumnList; } boolean shouldVisit(String jsonPath) { return shouldVisitSet.contains(jsonPath); } String newArrayJsonPath(String rootPath, int i) { String newPath = new StringBuilder(rootPath).append("[").append(Integer.toString(i)).append("]").toString(); if (! shouldVisit(newPath)) { newPath = new StringBuilder(rootPath).append("[*]").toString(); // try [*] too } return newPath; } String newMapJsonPath(String rootPath, Value elementPathValue) { String elementPath = elementPathValue.asStringValue().asString(); String newPath = new StringBuilder(rootPath).append("['").append(elementPath).append("']").toString(); return newPath; } Value visitArray(String rootPath, ArrayValue arrayValue) { int size = arrayValue.size(); ArrayList newValue = new ArrayList<>(size); int j = 0; if (this.jsonDropColumns.containsKey(rootPath)) { HashSet jsonDropColumns = this.jsonDropColumns.get(rootPath); for (int i = 0; i < size; i++) { String newPath = newArrayJsonPath(rootPath, i); if (! jsonDropColumns.contains(newPath)) { Value v = arrayValue.get(i); newValue.add(j++, visit(newPath, v)); } } } else if (this.jsonColumns.containsKey(rootPath)) { for (JsonColumn jsonColumn : this.jsonColumns.get(rootPath).values()) { int src = jsonColumn.getSrcTailIndex().intValue(); if (src == JsonColumn.WILDCARD_INDEX) { for (int i = 0; i < size; i++) { Value v = arrayValue.get(i); if (v == null) { v = jsonColumn.getDefaultValue(); } String newPath = jsonColumn.getPath(); Value visited = visit(newPath, v); newValue.add(j++, visited == null ? ValueFactory.newNil() : visited); } } else { Value v = (src < arrayValue.size() ? arrayValue.get(src) : null); if (v == null) { v = jsonColumn.getDefaultValue(); } String newPath = jsonColumn.getPath(); Value visited = visit(newPath, v); newValue.add(j++, visited == null ? ValueFactory.newNil() : visited); } } } else { for (int i = 0; i < size; i++) { String newPath = newArrayJsonPath(rootPath, i); Value v = arrayValue.get(i); newValue.add(j++, visit(newPath, v)); } } if (this.jsonAddColumns.containsKey(rootPath)) { for (JsonColumn jsonColumn : this.jsonAddColumns.get(rootPath).values()) { int i = jsonColumn.getTailIndex().intValue(); if (i == JsonColumn.WILDCARD_INDEX || i < size) { // index for add_columns must be larger than size // just skip because we can not raise ConfigException beforehand for flexible JSON continue; } int src = jsonColumn.getSrcTailIndex().intValue(); Value v = (src < arrayValue.size() ? arrayValue.get(src) : null); if (v == null) { v = jsonColumn.getDefaultValue(); } String newPath = jsonColumn.getPath(); Value visited = visit(newPath, v); // this ignores specified index, but appends to last now newValue.add(j++, visited == null ? ValueFactory.newNil() : visited); } } return ValueFactory.newArray(newValue.toArray(new Value[0]), true); } Value visitMap(String rootPath, MapValue mapValue) { int size = mapValue.size(); int i = 0; ArrayList newValue = new ArrayList<>(size * 2); if (this.jsonDropColumns.containsKey(rootPath)) { HashSet jsonDropColumns = this.jsonDropColumns.get(rootPath); for (Map.Entry entry : mapValue.entrySet()) { Value k = entry.getKey(); Value v = entry.getValue(); String newPath = newMapJsonPath(rootPath, k); if (! jsonDropColumns.contains(newPath)) { Value visited = visit(newPath, v); newValue.add(i++, k); newValue.add(i++, visited); } } } else if (this.jsonColumns.containsKey(rootPath)) { Map map =; for (JsonColumn jsonColumn : this.jsonColumns.get(rootPath).values()) { Value src = jsonColumn.getSrcTailNameValue(); Value v = map.get(src); if (v == null) { v = jsonColumn.getDefaultValue(); } String newPath = jsonColumn.getPath(); Value visited = visit(newPath, v); newValue.add(i++, jsonColumn.getTailNameValue()); newValue.add(i++, visited == null ? ValueFactory.newNil() : visited); } } else { for (Map.Entry entry : mapValue.entrySet()) { Value k = entry.getKey(); Value v = entry.getValue(); String newPath = newMapJsonPath(rootPath, k); Value visited = visit(newPath, v); newValue.add(i++, k); newValue.add(i++, visited); } } if (this.jsonAddColumns.containsKey(rootPath)) { Map map =; for (JsonColumn jsonColumn : this.jsonAddColumns.get(rootPath).values()) { Value k = jsonColumn.getTailNameValue(); if (map.containsKey(k)) { // key must be different with already existing one for add_columns // just skip because we can not raise ConfigException beforehand for flexible JSON continue; } Value src = jsonColumn.getSrcTailNameValue(); Value v = map.get(src); if (v == null) { v = jsonColumn.getDefaultValue(); } String newPath = jsonColumn.getPath(); Value visited = visit(newPath, v); newValue.add(i++, jsonColumn.getTailNameValue()); newValue.add(i++, visited == null ? ValueFactory.newNil() : visited); } } return ValueFactory.newMap(newValue.toArray(new Value[0]), true); } public Value visit(String rootPath, Value value) { if (! shouldVisit(rootPath)) { return value; } if (value == null) { return null; } else if (value.isArrayValue()) { return visitArray(rootPath, value.asArrayValue()); } else if (value.isMapValue()) { return visitMap(rootPath, value.asMapValue()); } else { return value; } } }