src/main/java/org/embulk/executor/remoteserver/RemoteServerExecutor.java in embulk-executor-remoteserver-0.2.1 vs src/main/java/org/embulk/executor/remoteserver/RemoteServerExecutor.java in embulk-executor-remoteserver-0.3.0
- old
+ new
@@ -22,10 +22,11 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
public class RemoteServerExecutor implements ExecutorPlugin {
private static final Logger log = LoggerFactory.getLogger(RemoteServerExecutor.class);
@@ -40,12 +41,37 @@
@Config("timeout_seconds")
@ConfigDefault("3600")
int getTimeoutSeconds();
+ @Config("tls")
+ @ConfigDefault("false")
+ boolean getTLS();
+
+ @Config("key_store_p12")
+ @ConfigDefault("null")
+ Optional<P12File> getKeyStoreP12();
+
+ @Config("trust_store_p12")
+ @ConfigDefault("null")
+ Optional<P12File> getTrustStoreP12();
+
@ConfigInject
ModelManager getModelManager();
+
+ // Used for the local mode (mainly for testing)
+ @Config("__server_key_store_p12")
+ @ConfigDefault("null")
+ Optional<P12File> getServerKeyStoreP12();
+
+ @Config("__server_trust_store_p12")
+ @ConfigDefault("null")
+ Optional<P12File> getServerTrustStoreP12();
+
+ @Config("__server_enable_client_auth")
+ @ConfigDefault("false")
+ boolean getServerEnableClientAuth();
}
@Inject
public RemoteServerExecutor(@ForSystemConfig ConfigSource systemConfig, ScriptingContainer jruby) {
this.systemConfig = systemConfig;
@@ -54,12 +80,12 @@
@Override
public void transaction(ConfigSource config, Schema outputSchema, int inputTaskCount, Control control) {
PluginTask task = config.loadConfig(PluginTask.class);
if (task.getHosts().isEmpty()) {
- log.info("Hosts is empty. Run with a local server.");
- try (EmbulkServer _autoclosed = EmbulkServer.start(DEFAULT_HOST.getName(), DEFAULT_HOST.getPort(), 1)) {
+ log.info("Hosts is empty. Run as the local mode.");
+ try (EmbulkServer _autoclosed = startEmbulkServer(task)) {
control.transaction(outputSchema, inputTaskCount, new ExecutorImpl(inputTaskCount, task, Collections.singletonList(DEFAULT_HOST)));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
} else {
@@ -101,11 +127,19 @@
String pluginTaskJson = modelManager.writeObject(pluginTask);
String processTaskJson = modelManager.writeObject(processTask);
ClientSession session = new ClientSession(
systemConfigJson, pluginTaskJson, processTaskJson, gemSpecs, pluginArchiveBytes, state, inputTaskCount, modelManager);
- try (EmbulkClient client = EmbulkClient.open(session, hosts)) {
+
+ TLSConfig tlsConfig = null;
+ if (pluginTask.getTLS()) {
+ tlsConfig = new TLSConfig();
+ pluginTask.getKeyStoreP12().ifPresent(tlsConfig::keyStore);
+ pluginTask.getTrustStoreP12().ifPresent(tlsConfig::trustStore);
+ }
+
+ try (EmbulkClient client = EmbulkClient.open(session, hosts, tlsConfig)) {
client.createSession();
state.initialize(inputTaskCount, inputTaskCount);
for (int i = 0; i < inputTaskCount; i++) {
if (state.getOutputTaskState(i).isCommitted()) {
@@ -129,7 +163,20 @@
.build();
try (FileOutputStream fos = new FileOutputStream(tempFile)) {
return archive.dump(fos);
}
}
+ }
+
+ private EmbulkServer startEmbulkServer(PluginTask task) throws IOException {
+ TLSConfig tlsConfig = null;
+ if (task.getTLS()) {
+ tlsConfig = new TLSConfig();
+ task.getServerKeyStoreP12().ifPresent(tlsConfig::keyStore);
+ task.getServerTrustStoreP12().ifPresent(tlsConfig::trustStore);
+ if (task.getServerEnableClientAuth()) {
+ tlsConfig.enableClientAuth(true);
+ }
+ }
+ return EmbulkServer.start(DEFAULT_HOST.getName(), DEFAULT_HOST.getPort(), 1, tlsConfig);
}
}