src/main/java/org/embulk/input/rethinkdb/RethinkdbInputPlugin.java in embulk-input-rethinkdb-0.1.1 vs src/main/java/org/embulk/input/rethinkdb/RethinkdbInputPlugin.java in embulk-input-rethinkdb-0.1.2
- old
+ new
@@ -78,14 +78,21 @@
@Config("query")
@ConfigDefault("null")
Optional<String> getQuery();
+ @Config("table")
+ @ConfigDefault("null")
+ Optional<String> getTable();
+
@Config("column_name")
@ConfigDefault("\"record\"")
String getColumnName();
+ String getReql();
+ void setReql(String ast);
+
@ConfigInject
BufferAllocator getBufferAllocator();
}
private static final Logger logger = LoggerFactory.getLogger(RethinkdbInputPlugin.class);
@@ -100,13 +107,25 @@
}
if (!(task.getUser().isPresent() && task.getPassword().isPresent())) {
throw new ConfigException("user and password are needed");
}
- if (!task.getQuery().isPresent()) {
- throw new ConfigException("query is needed");
+
+ String reql;
+ if (task.getQuery().isPresent()) {
+ if (task.getTable().isPresent()) {
+ throw new ConfigException("only one of 'table' or 'query' parameter is needed");
+ }
+ reql = String.format("var ast = %s; var res = {ast: ast}; res;", task.getQuery().get());
}
+ else {
+ if (!task.getTable().isPresent()) {
+ throw new ConfigException("'table' or 'query' parameter is needed");
+ }
+ reql = String.format("var ast = r.table('%s'); var res = {ast: ast}; res;", task.getTable().get());
+ }
+ task.setReql(reql);
Schema schema = Schema.builder().add(task.getColumnName(), Types.JSON).build();
int taskCount = 1;
return resume(task.dump(), schema, taskCount, control);
@@ -139,16 +158,15 @@
final Column column = pageBuilder.getSchema().getColumns().get(0);
RethinkDB r = RethinkDB.r;
ReqlAst ast;
try {
- ast = compileReQL(r, task.getQuery().get());
+ ast = compileReQL(r, task.getReql());
}
catch (final ScriptException se) {
throw new ConfigException("ReQL compile error");
}
-
Connection.Builder builder = r.connection()
.hostname(task.getHost())
.port(task.getPort())
.db(task.getDatabase())
.user(task.getUser().get(), task.getPassword().get());
@@ -183,17 +201,16 @@
public ConfigDiff guess(ConfigSource config)
{
return Exec.newConfigDiff();
}
- private ReqlAst compileReQL(RethinkDB r, String query) throws ScriptException
+ private ReqlAst compileReQL(RethinkDB r, String reql) throws ScriptException
{
ScriptEngineManager factory = new ScriptEngineManager();
ScriptEngine engine = factory.getEngineByName("nashorn");
engine.put("r", r);
- String reql = String.format("var ast = %s; var res = {ast: ast}; res;", query);
Bindings bindings = (Bindings) engine.eval(reql);
return (ReqlAst) bindings.get("ast");
}