src/main/java/org/embulk/executor/remoteserver/RemoteServerExecutor.java in embulk-executor-remoteserver-0.2.1 vs src/main/java/org/embulk/executor/remoteserver/RemoteServerExecutor.java in embulk-executor-remoteserver-0.3.0

- old
+ new

@@ -22,10 +22,11 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.nio.file.Files; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; public class RemoteServerExecutor implements ExecutorPlugin { private static final Logger log = LoggerFactory.getLogger(RemoteServerExecutor.class); @@ -40,12 +41,37 @@ @Config("timeout_seconds") @ConfigDefault("3600") int getTimeoutSeconds(); + @Config("tls") + @ConfigDefault("false") + boolean getTLS(); + + @Config("key_store_p12") + @ConfigDefault("null") + Optional<P12File> getKeyStoreP12(); + + @Config("trust_store_p12") + @ConfigDefault("null") + Optional<P12File> getTrustStoreP12(); + @ConfigInject ModelManager getModelManager(); + + // Used for the local mode (mainly for testing) + @Config("__server_key_store_p12") + @ConfigDefault("null") + Optional<P12File> getServerKeyStoreP12(); + + @Config("__server_trust_store_p12") + @ConfigDefault("null") + Optional<P12File> getServerTrustStoreP12(); + + @Config("__server_enable_client_auth") + @ConfigDefault("false") + boolean getServerEnableClientAuth(); } @Inject public RemoteServerExecutor(@ForSystemConfig ConfigSource systemConfig, ScriptingContainer jruby) { this.systemConfig = systemConfig; @@ -54,12 +80,12 @@ @Override public void transaction(ConfigSource config, Schema outputSchema, int inputTaskCount, Control control) { PluginTask task = config.loadConfig(PluginTask.class); if (task.getHosts().isEmpty()) { - log.info("Hosts is empty. Run with a local server."); - try (EmbulkServer _autoclosed = EmbulkServer.start(DEFAULT_HOST.getName(), DEFAULT_HOST.getPort(), 1)) { + log.info("Hosts is empty. Run as the local mode."); + try (EmbulkServer _autoclosed = startEmbulkServer(task)) { control.transaction(outputSchema, inputTaskCount, new ExecutorImpl(inputTaskCount, task, Collections.singletonList(DEFAULT_HOST))); } catch (IOException e) { throw new UncheckedIOException(e); } } else { @@ -101,11 +127,19 @@ String pluginTaskJson = modelManager.writeObject(pluginTask); String processTaskJson = modelManager.writeObject(processTask); ClientSession session = new ClientSession( systemConfigJson, pluginTaskJson, processTaskJson, gemSpecs, pluginArchiveBytes, state, inputTaskCount, modelManager); - try (EmbulkClient client = EmbulkClient.open(session, hosts)) { + + TLSConfig tlsConfig = null; + if (pluginTask.getTLS()) { + tlsConfig = new TLSConfig(); + pluginTask.getKeyStoreP12().ifPresent(tlsConfig::keyStore); + pluginTask.getTrustStoreP12().ifPresent(tlsConfig::trustStore); + } + + try (EmbulkClient client = EmbulkClient.open(session, hosts, tlsConfig)) { client.createSession(); state.initialize(inputTaskCount, inputTaskCount); for (int i = 0; i < inputTaskCount; i++) { if (state.getOutputTaskState(i).isCommitted()) { @@ -129,7 +163,20 @@ .build(); try (FileOutputStream fos = new FileOutputStream(tempFile)) { return archive.dump(fos); } } + } + + private EmbulkServer startEmbulkServer(PluginTask task) throws IOException { + TLSConfig tlsConfig = null; + if (task.getTLS()) { + tlsConfig = new TLSConfig(); + task.getServerKeyStoreP12().ifPresent(tlsConfig::keyStore); + task.getServerTrustStoreP12().ifPresent(tlsConfig::trustStore); + if (task.getServerEnableClientAuth()) { + tlsConfig.enableClientAuth(true); + } + } + return EmbulkServer.start(DEFAULT_HOST.getName(), DEFAULT_HOST.getPort(), 1, tlsConfig); } }