package org.embulk.input.rethinkdb; import com.google.common.base.Optional; import com.rethinkdb.RethinkDB; import com.rethinkdb.ast.ReqlAst; import com.rethinkdb.net.Connection; import com.rethinkdb.net.Cursor; import org.embulk.config.Config; import org.embulk.config.ConfigDefault; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigException; import org.embulk.config.ConfigInject; import org.embulk.config.ConfigSource; import org.embulk.config.Task; import org.embulk.config.TaskReport; import org.embulk.config.TaskSource; import org.embulk.spi.BufferAllocator; import org.embulk.spi.Column; import org.embulk.spi.DataException; import org.embulk.spi.Exec; import org.embulk.spi.InputPlugin; import org.embulk.spi.PageBuilder; import org.embulk.spi.PageOutput; import org.embulk.spi.Schema; import org.embulk.spi.type.Types; import org.msgpack.value.Value; import org.msgpack.value.ValueFactory; import javax.script.Bindings; import javax.script.ScriptEngine; import javax.script.ScriptEngineManager; import javax.script.ScriptException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; public class RethinkdbInputPlugin implements InputPlugin { public interface PluginTask extends Task { @Config("host") String getHost(); @Config("port") @ConfigDefault("28015") int getPort(); @Config("database") String getDatabase(); @Config("user") @ConfigDefault("null") Optional getUser(); @Config("password") @ConfigDefault("null") Optional getPassword(); @Config("auth_key") @ConfigDefault("null") Optional getAuthKey(); @Config("cert_file") @ConfigDefault("null") Optional getCertFile(); @Config("query") @ConfigDefault("null") Optional getQuery(); @Config("column_name") @ConfigDefault("\"record\"") String getColumnName(); @ConfigInject BufferAllocator getBufferAllocator(); } @Override public ConfigDiff transaction(ConfigSource config, InputPlugin.Control control) { PluginTask task = config.loadConfig(PluginTask.class); if (task.getAuthKey().isPresent()) { throw new ConfigException("auth_key option is not supported yet"); } if (task.getCertFile().isPresent()) { throw new ConfigException("cert_file option is not supported yet"); } if (!(task.getUser().isPresent() && task.getPassword().isPresent())) { throw new ConfigException("user and password are needed"); } if (!task.getQuery().isPresent()) { throw new ConfigException("query is needed"); } Schema schema = Schema.builder().add(task.getColumnName(), Types.JSON).build(); int taskCount = 1; return resume(task.dump(), schema, taskCount, control); } @Override public ConfigDiff resume(TaskSource taskSource, Schema schema, int taskCount, InputPlugin.Control control) { control.run(taskSource, schema, taskCount); return Exec.newConfigDiff(); } @Override public void cleanup(TaskSource taskSource, Schema schema, int taskCount, List successTaskReports) { } @Override public TaskReport run(TaskSource taskSource, Schema schema, int taskIndex, PageOutput output) { PluginTask task = taskSource.loadTask(PluginTask.class); BufferAllocator allocator = task.getBufferAllocator(); PageBuilder pageBuilder = new PageBuilder(allocator, schema, output); final Column column = pageBuilder.getSchema().getColumns().get(0); RethinkDB r = RethinkDB.r; ReqlAst ast; try { ast = compileReQL(r, task.getQuery().get()); } catch (final ScriptException se) { throw new ConfigException("ReQL compile error"); } Connection conn = r.connection() .hostname(task.getHost()) .port(task.getPort()) .db(task.getDatabase()) .user(task.getUser().get(), task.getPassword().get()) .connect(); Cursor cursor = ast.run(conn); for (Object doc : cursor) { pageBuilder.setJson(column, doc2Value(doc)); pageBuilder.addRecord(); } conn.close(); pageBuilder.finish(); return Exec.newTaskReport(); } @Override public ConfigDiff guess(ConfigSource config) { return Exec.newConfigDiff(); } private ReqlAst compileReQL(RethinkDB r, String query) throws ScriptException { ScriptEngineManager factory = new ScriptEngineManager(); ScriptEngine engine = factory.getEngineByName("nashorn"); engine.put("r", r); String reql = String.format("var ast = %s; var res = {ast: ast}; res;", query); Bindings bindings = (Bindings) engine.eval(reql); return (ReqlAst) bindings.get("ast"); } private Value doc2Value(Object doc) throws DataException { if (doc == null) { return ValueFactory.newNil(); } else if (doc instanceof java.lang.Long) { return ValueFactory.newInteger((java.lang.Long) doc); } else if (doc instanceof java.lang.Double) { return ValueFactory.newFloat((java.lang.Double) doc); } else if (doc instanceof java.lang.String) { return ValueFactory.newString((java.lang.String) doc); } else if (doc instanceof java.lang.Boolean) { return ValueFactory.newBoolean((java.lang.Boolean) doc); } else if (doc instanceof java.time.OffsetDateTime) { return ValueFactory.newString(((java.time.OffsetDateTime) doc).toString()); } else if (doc instanceof java.util.List) { List list = new ArrayList<>(); for (Object obj : ((java.util.List) doc)) { list.add(doc2Value(obj)); } return ValueFactory.newArray(list); } else if (doc instanceof java.util.Map) { Map map = new HashMap<>(); Map m = (Map) doc; Set> entries = m.entrySet(); for (Map.Entry e : entries) { map.put(doc2Value(e.getKey()), doc2Value(e.getValue())); } return ValueFactory.newMap(map); } else { throw new DataException("Record parse error, unknown document type"); } } }