src/main/java/org/embulk/output/elasticsearch/ElasticsearchHttpClient.java in embulk-output-elasticsearch-0.4.3 vs src/main/java/org/embulk/output/elasticsearch/ElasticsearchHttpClient.java in embulk-output-elasticsearch-0.4.4

- old
+ new

@@ -2,21 +2,23 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import org.eclipse.jetty.client.HttpResponseException; +import com.google.common.base.Throwables; import org.eclipse.jetty.client.util.StringContentProvider; import org.eclipse.jetty.http.HttpMethod; +import org.eclipse.jetty.util.ssl.SslContextFactory; import org.embulk.config.ConfigException; import org.embulk.config.UserDataException; import org.embulk.output.elasticsearch.ElasticsearchOutputPluginDelegate.AuthMethod; import org.embulk.output.elasticsearch.ElasticsearchOutputPluginDelegate.NodeAddressTask; import org.embulk.output.elasticsearch.ElasticsearchOutputPluginDelegate.PluginTask; import org.embulk.spi.DataException; import org.embulk.spi.Exec; import org.embulk.spi.time.Timestamp; +import org.embulk.util.retryhelper.jetty92.Jetty92ClientCreator; import org.embulk.util.retryhelper.jetty92.Jetty92RetryHelper; import org.embulk.util.retryhelper.jetty92.Jetty92SingleRequester; import org.embulk.util.retryhelper.jetty92.StringJetty92ResponseEntityReader; import org.slf4j.Logger; @@ -52,11 +54,11 @@ public ElasticsearchHttpClient() { this.log = Exec.getLogger(getClass()); } - public void push(JsonNode records, PluginTask task, Jetty92RetryHelper retryHelper) + public void push(JsonNode records, PluginTask task) { int bulkActions = task.getBulkActions(); long bulkSize = task.getBulkSize(); // curl -xPOST localhost:9200/{index}/{type}/_bulk -d ' // {"index" : {}}\n @@ -76,42 +78,42 @@ String requestString = jsonMapper.writeValueAsString(record); sb.append("\n") .append(requestString) .append("\n"); } - sendRequest(path, HttpMethod.POST, task, retryHelper, sb.toString()); + sendRequest(path, HttpMethod.POST, task, sb.toString()); } } catch (JsonProcessingException ex) { throw new DataException(ex); } } - public List<String> getIndexByAlias(String aliasName, PluginTask task, Jetty92RetryHelper retryHelper) + public List<String> getIndexByAlias(String aliasName, PluginTask task) { // curl -XGET localhost:9200/_alias/{alias} // No alias: 404 // Alias found: {"embulk_20161018-183738":{"aliases":{"embulk":{}}}} List<String> indices = new ArrayList<>(); String path = String.format("/_alias/%s", aliasName); - JsonNode response = sendRequest(path, HttpMethod.GET, task, retryHelper); + JsonNode response = sendRequest(path, HttpMethod.GET, task); Iterator it = response.fieldNames(); while (it.hasNext()) { indices.add(it.next().toString()); } return indices; } - public boolean isIndexExisting(String indexName, PluginTask task, Jetty92RetryHelper retryHelper) + public boolean isIndexExisting(String indexName, PluginTask task) { // curl -XGET localhost:9200/{index} // No index: 404 // Index found: 200 try { - sendRequest(indexName, HttpMethod.GET, task, retryHelper); + sendRequest(indexName, HttpMethod.GET, task); return true; } catch (ResourceNotFoundException ex) { return false; } @@ -121,16 +123,16 @@ { Timestamp time = Exec.getTransactionTime(); return indexName + new SimpleDateFormat("_yyyyMMdd-HHmmss").format(time.toEpochMilli()); } - public boolean isAliasExisting(String aliasName, PluginTask task, Jetty92RetryHelper retryHelper) + public boolean isAliasExisting(String aliasName, PluginTask task) { // curl -XGET localhost:9200/_aliases // List all aliases // No aliases: {} // Aliases found: {"embulk_20161018-183738":{"aliases":{"embulk":{}}}} - JsonNode response = sendRequest("/_aliases", HttpMethod.GET, task, retryHelper); + JsonNode response = sendRequest("/_aliases", HttpMethod.GET, task); if (response.size() == 0) { return false; } for (JsonNode index : response) { if (index.has("aliases") && index.get("aliases").has(aliasName)) { @@ -141,28 +143,28 @@ } // Should be called just once while Embulk transaction. // Be sure to call after all exporting tasks completed // This method will delete existing index - public void reassignAlias(String aliasName, String newIndexName, PluginTask task, Jetty92RetryHelper retryHelper) + public void reassignAlias(String aliasName, String newIndexName, PluginTask task) { - if (!isAliasExisting(aliasName, task, retryHelper)) { - assignAlias(newIndexName, aliasName, task, retryHelper); + if (!isAliasExisting(aliasName, task)) { + assignAlias(newIndexName, aliasName, task); } else { - List<String> oldIndices = getIndexByAlias(aliasName, task, retryHelper); - assignAlias(newIndexName, aliasName, task, retryHelper); + List<String> oldIndices = getIndexByAlias(aliasName, task); + assignAlias(newIndexName, aliasName, task); for (String index : oldIndices) { - deleteIndex(index, task, retryHelper); + deleteIndex(index, task); } } } - public String getEsVersion(PluginTask task, Jetty92RetryHelper retryHelper) + public String getEsVersion(PluginTask task) { // curl -XGET 'http://localhost:9200’ - JsonNode response = sendRequest("", HttpMethod.GET, task, retryHelper); + JsonNode response = sendRequest("", HttpMethod.GET, task); return response.get("version").get("number").asText(); } public void validateIndexOrAliasName(String index, String type) { @@ -206,22 +208,22 @@ // {"index" : {}} return "{\"index\" : {}}"; } } - private void assignAlias(String indexName, String aliasName, PluginTask task, Jetty92RetryHelper retryHelper) + private void assignAlias(String indexName, String aliasName, PluginTask task) { try { - if (isIndexExisting(indexName, task, retryHelper)) { - if (isAliasExisting(aliasName, task, retryHelper)) { + if (isIndexExisting(indexName, task)) { + if (isAliasExisting(aliasName, task)) { // curl -XPUT http://localhost:9200/_alias -d\ // "actions" : [ // {"remove" : {"alias" : "{alias}", "index" : "{index_old}"}}, // {"add" : {"alias": "{alias}", "index": "{index_new}"}} // ] // Success: {"acknowledged":true} - List<String> oldIndices = getIndexByAlias(aliasName, task, retryHelper); + List<String> oldIndices = getIndexByAlias(aliasName, task); Map<String, String> newAlias = new HashMap<>(); newAlias.put("alias", aliasName); newAlias.put("index", indexName); Map<String, Map> add = new HashMap<>(); @@ -241,45 +243,45 @@ actions.add(add); Map<String, List> rootTree = new HashMap<>(); rootTree.put("actions", actions); String content = jsonMapper.writeValueAsString(rootTree); - sendRequest("/_aliases", HttpMethod.POST, task, retryHelper, content); + sendRequest("/_aliases", HttpMethod.POST, task, content); log.info("Reassigned alias [{}] to index[{}]", aliasName, indexName); } else { // curl -XPUT http://localhost:9200/{index}/_alias/{alias} // Success: {"acknowledged":true} String path = String.format("/%s/_alias/%s", indexName, aliasName); - sendRequest(path, HttpMethod.PUT, task, retryHelper); + sendRequest(path, HttpMethod.PUT, task); log.info("Assigned alias [{}] to Index [{}]", aliasName, indexName); } } } catch (JsonProcessingException ex) { throw new ConfigException(String.format("Failed to assign alias[%s] to index[%s]", aliasName, indexName)); } } - private void deleteIndex(String indexName, PluginTask task, Jetty92RetryHelper retryHelper) + private void deleteIndex(String indexName, PluginTask task) { // curl -XDELETE localhost:9200/{index} // Success: {"acknowledged":true} - if (isIndexExisting(indexName, task, retryHelper)) { - waitSnapshot(task, retryHelper); - sendRequest(indexName, HttpMethod.DELETE, task, retryHelper); + if (isIndexExisting(indexName, task)) { + waitSnapshot(task); + sendRequest(indexName, HttpMethod.DELETE, task); log.info("Deleted Index [{}]", indexName); } } - private void waitSnapshot(PluginTask task, Jetty92RetryHelper retryHelper) + private void waitSnapshot(PluginTask task) { int maxSnapshotWaitingMills = task.getMaxSnapshotWaitingSecs() * 1000; long execCount = 1; long totalWaitingTime = 0; // Since only needs exponential backoff, don't need exception handling and others, I don't use Embulk RetryExecutor - while (isSnapshotProgressing(task, retryHelper)) { + while (isSnapshotProgressing(task)) { long sleepTime = ((long) Math.pow(2, execCount) * 1000); try { Thread.sleep(sleepTime); } catch (InterruptedException ex) { @@ -294,69 +296,71 @@ throw new ConfigException(String.format("Waiting creating snapshot is expired. %s sec.", maxSnapshotWaitingMills)); } } } - private boolean isSnapshotProgressing(PluginTask task, Jetty92RetryHelper retryHelper) + private boolean isSnapshotProgressing(PluginTask task) { // https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html#_snapshot_status // curl -XGET localhost:9200/_snapshot/_status - JsonNode response = sendRequest("/_snapshot/_status", HttpMethod.GET, task, retryHelper); + JsonNode response = sendRequest("/_snapshot/_status", HttpMethod.GET, task); String snapshots = response.get("snapshots").asText(); return !snapshots.equals(""); } - private JsonNode sendRequest(String path, final HttpMethod method, PluginTask task, Jetty92RetryHelper retryHelper) + private JsonNode sendRequest(String path, final HttpMethod method, PluginTask task) { - return sendRequest(path, method, task, retryHelper, ""); + return sendRequest(path, method, task, ""); } - private JsonNode sendRequest(String path, final HttpMethod method, final PluginTask task, Jetty92RetryHelper retryHelper, final String content) + private JsonNode sendRequest(String path, final HttpMethod method, final PluginTask task, final String content) { final String uri = createRequestUri(task, path); final String authorizationHeader = getAuthorizationHeader(task); - String responseBody = retryHelper.requestWithRetry( - new StringJetty92ResponseEntityReader(task.getTimeoutMills()), - new Jetty92SingleRequester() { - @Override - public void requestOnce(org.eclipse.jetty.client.HttpClient client, org.eclipse.jetty.client.api.Response.Listener responseListener) - { - org.eclipse.jetty.client.api.Request request = client - .newRequest(uri) - .accept("application/json") - .method(method); - if (method == HttpMethod.POST) { - request.content(new StringContentProvider(content), "application/json"); + try (Jetty92RetryHelper retryHelper = createRetryHelper(task)) { + String responseBody = retryHelper.requestWithRetry( + new StringJetty92ResponseEntityReader(task.getTimeoutMills()), + new Jetty92SingleRequester() { + @Override + public void requestOnce(org.eclipse.jetty.client.HttpClient client, org.eclipse.jetty.client.api.Response.Listener responseListener) + { + org.eclipse.jetty.client.api.Request request = client + .newRequest(uri) + .accept("application/json") + .method(method); + if (method == HttpMethod.POST) { + request.content(new StringContentProvider(content), "application/json"); + } + + if (!authorizationHeader.isEmpty()) { + request.header("Authorization", authorizationHeader); + } + request.send(responseListener); } - if (!authorizationHeader.isEmpty()) { - request.header("Authorization", authorizationHeader); + @Override + public boolean isExceptionToRetry(Exception exception) + { + return task.getId().isPresent(); } - request.send(responseListener); - } - @Override - public boolean isExceptionToRetry(Exception exception) - { - return task.getId().isPresent(); - } - - @Override - public boolean isResponseStatusToRetry(org.eclipse.jetty.client.api.Response response) - { - int status = response.getStatus(); - if (status == 404) { - throw new ResourceNotFoundException("Requested resource was not found"); + @Override + public boolean isResponseStatusToRetry(org.eclipse.jetty.client.api.Response response) + { + int status = response.getStatus(); + if (status == 404) { + throw new ResourceNotFoundException("Requested resource was not found"); + } + else if (status == 429) { + return true; // Retry if 429. + } + return status / 100 != 4; // Retry unless 4xx except for 429. } - else if (status == 429) { - return true; // Retry if 429. - } - return status / 100 != 4; // Retry unless 4xx except for 429. - } - }); - return parseJson(responseBody); + }); + return parseJson(responseBody); + } } private String createRequestUri(PluginTask task, String path) { if (!path.startsWith("/")) { @@ -383,9 +387,31 @@ return this.jsonMapper.readTree(json); } catch (IOException ex) { throw new DataException(ex); } + } + + private Jetty92RetryHelper createRetryHelper(PluginTask task) + { + return new Jetty92RetryHelper( + task.getMaximumRetries(), + task.getInitialRetryIntervalMillis(), + task.getMaximumRetryIntervalMillis(), + new Jetty92ClientCreator() { + @Override + public org.eclipse.jetty.client.HttpClient createAndStart() + { + org.eclipse.jetty.client.HttpClient client = new org.eclipse.jetty.client.HttpClient(new SslContextFactory()); + try { + client.start(); + return client; + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + }); } @VisibleForTesting protected String getAuthorizationHeader(PluginTask task) {