package org.embulk.filter.postgress_lookup; import com.google.common.collect.ImmutableList; import org.embulk.config.*; import org.embulk.spi.*; import org.embulk.spi.time.Timestamp; import org.embulk.spi.type.Types; import java.sql.*; import java.time.Instant; import java.util.*; public class PostgressLookupFilterPlugin implements FilterPlugin { public interface PluginTask extends Task { @Config("host") public String getHost(); @Config("port") public String getPort(); @Config("database") public String getDatabase(); @Config("table") public String getTableName(); @Config("username") public String getUserName(); @Config("password") public String getPassword(); @Config("mapping_from") public List getMappingFrom(); @Config("mapping_to") public List getMappingTo(); @Config("new_columns") public SchemaConfig getNewColumns(); @Config("driver_path") @ConfigDefault("null") public Optional getDriverPath(); @Config("driver_class") @ConfigDefault("null") public Optional getDriverClass(); @Config("schema_name") @ConfigDefault("null") public Optional getSchemaName(); } @Override public void transaction(ConfigSource config, Schema inputSchema, Control control) { PluginTask task = config.loadConfig(PluginTask.class); List inputColumns = task.getMappingFrom(); List keyColumns = task.getMappingTo(); if(inputColumns.size()!=keyColumns.size()){ throw new RuntimeException("Number of mapping_from columns must be exactly equals to number of mapping_to columns"); } Schema outputSchema = inputSchema; ImmutableList.Builder builder = ImmutableList.builder(); int i = 0; for (Column inputColumn : inputSchema.getColumns()) { Column outputColumn = new Column(i++, inputColumn.getName(), inputColumn.getType()); builder.add(outputColumn); } for (ColumnConfig columnConfig : task.getNewColumns().getColumns()) { builder.add(columnConfig.toColumn(i++)); } outputSchema = new Schema(builder.build()); control.run(task.dump(), outputSchema); } @Override public PageOutput open(TaskSource taskSource, Schema inputSchema, Schema outputSchema, PageOutput output) { PluginTask task = taskSource.loadTask(PluginTask.class); Map> map = new HashMap<>(); try { map = getKeyValueMap(task); } catch (SQLException e) { e.printStackTrace(); } PageReader pageReader = new PageReader(inputSchema); return new MyOutput(pageReader, inputSchema, outputSchema, output, task, map); } private Map> getKeyValueMap(PluginTask task) throws SQLException { Map> map = new HashMap<>(); Connection con = PostGresConnection.getConnection(task); DatabaseMetaData databaseMetaData =con.getMetaData(); String identifierQuoteString=databaseMetaData.getIdentifierQuoteString(); //String schemaName=null; if (task.getSchemaName().isPresent()) { String sql = "SET search_path TO " + identifierQuoteString + task.getSchemaName().get() + identifierQuoteString; System.out.println(sql); Statement stmt = con.createStatement(); try { stmt.executeUpdate(sql); } finally { stmt.close(); } } con.setAutoCommit(false); try { List targetColumns = task.getMappingTo(); List newColumns = new ArrayList<>(); for (ColumnConfig columnConfig : task.getNewColumns().getColumns()) { newColumns.add(columnConfig.getName()); } String query = "select "; String columnNeedsToBeFetched = ""; for (int i = 0; i < targetColumns.size(); i++) { columnNeedsToBeFetched += targetColumns.get(i) + ","; } for (int i = 0; i < newColumns.size(); i++) { if (i != newColumns.size() - 1) { columnNeedsToBeFetched += newColumns.get(i) + ","; } else { columnNeedsToBeFetched += newColumns.get(i); } } query += columnNeedsToBeFetched + " from " + task.getTableName(); Statement stmt = con.createStatement(); ResultSet rs = stmt.executeQuery(query); while (rs.next()) { //for key String key = ""; String comp = ""; for (int i = 0; i < targetColumns.size(); i++) { comp = "" + rs.getString(targetColumns.get(i)); if (comp.equalsIgnoreCase("null")) { key += ""; } else { key += rs.getString(targetColumns.get(i)); } if (i != targetColumns.size() - 1) { key += ","; } } //for values List keyArray = new ArrayList<>(); for (int i = 0; i < newColumns.size(); i++) { comp = "" + rs.getString(newColumns.get(i)); if (comp.equalsIgnoreCase("null")) { keyArray.add(""); } else { keyArray.add(rs.getString(newColumns.get(i))); } } map.put(key, keyArray); } } catch (Exception e) { e.printStackTrace(); } finally { con.close(); } return map; } public static class MyOutput implements PageOutput { private PageReader reader; private PageBuilder builder; private PluginTask task; private Schema inputSchema; private Map> keyValuePair; public MyOutput(PageReader pageReader, Schema inputSchema, Schema outputSchema, PageOutput pageOutput, PluginTask task, Map> keyValuePair) { this.reader = pageReader; this.builder = new PageBuilder(Exec.getBufferAllocator(), outputSchema, pageOutput); this.task = task; this.inputSchema = inputSchema; this.keyValuePair = keyValuePair; } @Override public void add(Page page) { reader.setPage(page); List columnConfigList = new ArrayList<>(); for (ColumnConfig columnConfig : task.getNewColumns().getColumns()) { columnConfigList.add(columnConfig); } while (reader.nextRecord()) { int colNum = 0; List inputColumns = task.getMappingFrom(); List searchingKeyData = new ArrayList<>(); Map keyMap = new HashMap<>(); keyMap.put("Key", 0); for (Column column : inputSchema.getColumns()) { if (reader.isNull(column)) { if ((keyMap.get("Key") < inputColumns.size()) && column.getName().equalsIgnoreCase(inputColumns.get(keyMap.get("Key")))) { searchingKeyData.add(""); int key = keyMap.get("Key"); keyMap.put("Key", ++key); } builder.setNull(colNum++); } else { add_builder(colNum++, column, searchingKeyData, inputColumns, keyMap); } } String key = ""; for (int k = 0; k < searchingKeyData.size(); k++) { key += searchingKeyData.get(k); if (k != searchingKeyData.size() - 1) { key += ","; } } List matchedData = new ArrayList<>(); if (keyValuePair.containsKey(key)) { matchedData = keyValuePair.get(key); } if (matchedData.size() == 0) { for (int k = 0; k < columnConfigList.size(); k++) { add_builder_for_new_column(colNum, columnConfigList.get(k).getType().getName(), "", false); colNum++; } } else { for (int k = 0; k < columnConfigList.size(); k++) { add_builder_for_new_column(colNum, columnConfigList.get(k).getType().getName(), matchedData.get(k), true); colNum++; } } builder.addRecord(); } } @Override public void finish() { builder.finish(); } @Override public void close() { builder.close(); } private void add_builder(int colNum, Column column, List searchingKeyData, List inputColumns, Map keyMap) { if (Types.STRING.equals(column.getType())) { if (keyMap.get("Key") < inputColumns.size()) { if (column.getName().equalsIgnoreCase(inputColumns.get(keyMap.get("Key")))) { searchingKeyData.add(reader.getString(column)); int key = keyMap.get("Key"); keyMap.put("Key", ++key); } } builder.setString(colNum, reader.getString(column)); } else if (Types.BOOLEAN.equals(column.getType())) { if (keyMap.get("Key") < inputColumns.size()) { if (column.getName().equalsIgnoreCase(inputColumns.get(keyMap.get("Key")))) { searchingKeyData.add(String.valueOf(reader.getBoolean(column))); int key = keyMap.get("Key"); keyMap.put("Key", ++key); } } builder.setBoolean(colNum, reader.getBoolean(column)); } else if (Types.DOUBLE.equals(column.getType())) { if (keyMap.get("Key") < inputColumns.size()) { if (column.getName().equalsIgnoreCase(inputColumns.get(keyMap.get("Key")))) { searchingKeyData.add(String.valueOf(reader.getDouble(column))); int key = keyMap.get("Key"); keyMap.put("Key", ++key); } } builder.setDouble(colNum, reader.getDouble(column)); } else if (Types.LONG.equals(column.getType())) { if (keyMap.get("Key") < inputColumns.size()) { if (column.getName().equalsIgnoreCase(inputColumns.get(keyMap.get("Key")))) { searchingKeyData.add(String.valueOf(reader.getLong(column))); int key = keyMap.get("Key"); keyMap.put("Key", ++key); } } builder.setLong(colNum, reader.getLong(column)); } else if (Types.TIMESTAMP.equals(column.getType())) { if (keyMap.get("Key") < inputColumns.size()) { if (column.getName().equalsIgnoreCase(inputColumns.get(keyMap.get("Key")))) { searchingKeyData.add(String.valueOf(reader.getTimestamp(column))); int key = keyMap.get("Key"); keyMap.put("Key", ++key); } } builder.setTimestamp(colNum, reader.getTimestamp(column)); } } private void add_builder_for_new_column(int colNum, String newlyAddedColumnType, String matchedData, Boolean isDataMatched) { try{ if (newlyAddedColumnType.equalsIgnoreCase("string")) { if (isDataMatched) { builder.setString(colNum, matchedData); } else { builder.setString(colNum, ""); } } else if (newlyAddedColumnType.equalsIgnoreCase("long")) { if (isDataMatched) { if (matchedData.length() == 0) { builder.setLong(colNum, 0); }else{ builder.setLong(colNum, Long.parseLong(matchedData)); } } else { builder.setLong(colNum, 0); } } else if (newlyAddedColumnType.equalsIgnoreCase("double")) { if (isDataMatched) { if (matchedData.length() == 0) { builder.setDouble(colNum, 0.0); }else{ builder.setDouble(colNum, Double.parseDouble(matchedData)); } } else { builder.setDouble(colNum, 0.0); } } else if (newlyAddedColumnType.equalsIgnoreCase("boolean")) { if (isDataMatched) { if (matchedData.length() == 0) { builder.setNull(colNum); }else{ builder.setBoolean(colNum, Boolean.parseBoolean(matchedData)); } } else { builder.setNull(colNum); } } else if (newlyAddedColumnType.equalsIgnoreCase("timestamp")) { if (isDataMatched) { if (matchedData.length() == 0) { builder.setNull(colNum); }else{ java.sql.Timestamp timestamp = java.sql.Timestamp.valueOf(matchedData); Instant instant = timestamp.toInstant(); Timestamp spiTimeStamp = Timestamp.ofInstant(instant); builder.setTimestamp(colNum, spiTimeStamp); } } else { builder.setNull(colNum); } } }catch (Exception e){ e.printStackTrace(); throw new RuntimeException("Data type could not be cast due to wrong data or issue in typecasting timestamp",e); } } } }