src/main/java/org/embulk/executor/remoteserver/RemoteServerExecutor.java in embulk-executor-remoteserver-0.1.1 vs src/main/java/org/embulk/executor/remoteserver/RemoteServerExecutor.java in embulk-executor-remoteserver-0.2.0
- old
+ new
@@ -23,21 +23,22 @@
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
public class RemoteServerExecutor implements ExecutorPlugin {
private static final Logger log = LoggerFactory.getLogger(RemoteServerExecutor.class);
- private static final Host DEFAULT_HOST = new Host("localhost", 30000);
+ private static final Host DEFAULT_HOST = new Host("localhost", 30001);
private final ConfigSource systemConfig;
private final ScriptingContainer jruby;
interface PluginTask extends Task {
@Config("hosts")
@ConfigDefault("[]")
- List<Host> getHosts();
+ List<String> getHosts();
@Config("timeout_seconds")
@ConfigDefault("3600")
int getTimeoutSeconds();
@@ -60,11 +61,14 @@
control.transaction(outputSchema, inputTaskCount, new ExecutorImpl(inputTaskCount, task, Collections.singletonList(DEFAULT_HOST)));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
} else {
- control.transaction(outputSchema, inputTaskCount, new ExecutorImpl(inputTaskCount, task, task.getHosts()));
+ control.transaction(
+ outputSchema,
+ inputTaskCount,
+ new ExecutorImpl(inputTaskCount, task, task.getHosts().stream().map(Host::of).collect(Collectors.toList())));
}
}
private class ExecutorImpl implements ExecutorPlugin.Executor {
private final PluginTask pluginTask;
@@ -95,23 +99,23 @@
ModelManager modelManager = pluginTask.getModelManager();
String systemConfigJson = modelManager.writeObject(systemConfigToSend);
String pluginTaskJson = modelManager.writeObject(pluginTask);
String processTaskJson = modelManager.writeObject(processTask);
- SessionState sessionState = new SessionState(
+ ClientSession session = new ClientSession(
systemConfigJson, pluginTaskJson, processTaskJson, gemSpecs, pluginArchiveBytes, state, inputTaskCount, modelManager);
- try (EmbulkClient client = EmbulkClient.open(sessionState, hosts)) {
+ try (EmbulkClient client = EmbulkClient.open(session, hosts)) {
client.createSession();
state.initialize(inputTaskCount, inputTaskCount);
for (int i = 0; i < inputTaskCount; i++) {
if (state.getOutputTaskState(i).isCommitted()) {
log.warn("Skipped resumed task {}", i);
continue;
}
client.startTask(i);
}
- sessionState.waitUntilCompleted(pluginTask.getTimeoutSeconds() + 1); // Add 1 sec to consider network latency
+ session.waitUntilCompleted(pluginTask.getTimeoutSeconds() + 1); // Add 1 sec to consider network latency
} catch (InterruptedException | TimeoutException e) {
throw new IllegalStateException(e);
} catch (IOException e) {
throw new UncheckedIOException(e);
}