src/main/java/org/embulk/executor/remoteserver/RemoteServerExecutor.java in embulk-executor-remoteserver-0.1.1 vs src/main/java/org/embulk/executor/remoteserver/RemoteServerExecutor.java in embulk-executor-remoteserver-0.2.0

- old
+ new

@@ -23,21 +23,22 @@ import java.io.UncheckedIOException; import java.nio.file.Files; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; public class RemoteServerExecutor implements ExecutorPlugin { private static final Logger log = LoggerFactory.getLogger(RemoteServerExecutor.class); - private static final Host DEFAULT_HOST = new Host("localhost", 30000); + private static final Host DEFAULT_HOST = new Host("localhost", 30001); private final ConfigSource systemConfig; private final ScriptingContainer jruby; interface PluginTask extends Task { @Config("hosts") @ConfigDefault("[]") - List<Host> getHosts(); + List<String> getHosts(); @Config("timeout_seconds") @ConfigDefault("3600") int getTimeoutSeconds(); @@ -60,11 +61,14 @@ control.transaction(outputSchema, inputTaskCount, new ExecutorImpl(inputTaskCount, task, Collections.singletonList(DEFAULT_HOST))); } catch (IOException e) { throw new UncheckedIOException(e); } } else { - control.transaction(outputSchema, inputTaskCount, new ExecutorImpl(inputTaskCount, task, task.getHosts())); + control.transaction( + outputSchema, + inputTaskCount, + new ExecutorImpl(inputTaskCount, task, task.getHosts().stream().map(Host::of).collect(Collectors.toList()))); } } private class ExecutorImpl implements ExecutorPlugin.Executor { private final PluginTask pluginTask; @@ -95,23 +99,23 @@ ModelManager modelManager = pluginTask.getModelManager(); String systemConfigJson = modelManager.writeObject(systemConfigToSend); String pluginTaskJson = modelManager.writeObject(pluginTask); String processTaskJson = modelManager.writeObject(processTask); - SessionState sessionState = new SessionState( + ClientSession session = new ClientSession( systemConfigJson, pluginTaskJson, processTaskJson, gemSpecs, pluginArchiveBytes, state, inputTaskCount, modelManager); - try (EmbulkClient client = EmbulkClient.open(sessionState, hosts)) { + try (EmbulkClient client = EmbulkClient.open(session, hosts)) { client.createSession(); state.initialize(inputTaskCount, inputTaskCount); for (int i = 0; i < inputTaskCount; i++) { if (state.getOutputTaskState(i).isCommitted()) { log.warn("Skipped resumed task {}", i); continue; } client.startTask(i); } - sessionState.waitUntilCompleted(pluginTask.getTimeoutSeconds() + 1); // Add 1 sec to consider network latency + session.waitUntilCompleted(pluginTask.getTimeoutSeconds() + 1); // Add 1 sec to consider network latency } catch (InterruptedException | TimeoutException e) { throw new IllegalStateException(e); } catch (IOException e) { throw new UncheckedIOException(e); }