src/main/java/org/embulk/input/rethinkdb/RethinkdbInputPlugin.java in embulk-input-rethinkdb-0.1.1 vs src/main/java/org/embulk/input/rethinkdb/RethinkdbInputPlugin.java in embulk-input-rethinkdb-0.1.2

- old
+ new

@@ -78,14 +78,21 @@ @Config("query") @ConfigDefault("null") Optional<String> getQuery(); + @Config("table") + @ConfigDefault("null") + Optional<String> getTable(); + @Config("column_name") @ConfigDefault("\"record\"") String getColumnName(); + String getReql(); + void setReql(String ast); + @ConfigInject BufferAllocator getBufferAllocator(); } private static final Logger logger = LoggerFactory.getLogger(RethinkdbInputPlugin.class); @@ -100,13 +107,25 @@ } if (!(task.getUser().isPresent() && task.getPassword().isPresent())) { throw new ConfigException("user and password are needed"); } - if (!task.getQuery().isPresent()) { - throw new ConfigException("query is needed"); + + String reql; + if (task.getQuery().isPresent()) { + if (task.getTable().isPresent()) { + throw new ConfigException("only one of 'table' or 'query' parameter is needed"); + } + reql = String.format("var ast = %s; var res = {ast: ast}; res;", task.getQuery().get()); } + else { + if (!task.getTable().isPresent()) { + throw new ConfigException("'table' or 'query' parameter is needed"); + } + reql = String.format("var ast = r.table('%s'); var res = {ast: ast}; res;", task.getTable().get()); + } + task.setReql(reql); Schema schema = Schema.builder().add(task.getColumnName(), Types.JSON).build(); int taskCount = 1; return resume(task.dump(), schema, taskCount, control); @@ -139,16 +158,15 @@ final Column column = pageBuilder.getSchema().getColumns().get(0); RethinkDB r = RethinkDB.r; ReqlAst ast; try { - ast = compileReQL(r, task.getQuery().get()); + ast = compileReQL(r, task.getReql()); } catch (final ScriptException se) { throw new ConfigException("ReQL compile error"); } - Connection.Builder builder = r.connection() .hostname(task.getHost()) .port(task.getPort()) .db(task.getDatabase()) .user(task.getUser().get(), task.getPassword().get()); @@ -183,17 +201,16 @@ public ConfigDiff guess(ConfigSource config) { return Exec.newConfigDiff(); } - private ReqlAst compileReQL(RethinkDB r, String query) throws ScriptException + private ReqlAst compileReQL(RethinkDB r, String reql) throws ScriptException { ScriptEngineManager factory = new ScriptEngineManager(); ScriptEngine engine = factory.getEngineByName("nashorn"); engine.put("r", r); - String reql = String.format("var ast = %s; var res = {ast: ast}; res;", query); Bindings bindings = (Bindings) engine.eval(reql); return (ReqlAst) bindings.get("ast"); }