src/main/java/org/embulk/output/elasticsearch/ElasticsearchHttpClient.java in embulk-output-elasticsearch-0.4.3 vs src/main/java/org/embulk/output/elasticsearch/ElasticsearchHttpClient.java in embulk-output-elasticsearch-0.4.4
- old
+ new
@@ -2,21 +2,23 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
-import org.eclipse.jetty.client.HttpResponseException;
+import com.google.common.base.Throwables;
import org.eclipse.jetty.client.util.StringContentProvider;
import org.eclipse.jetty.http.HttpMethod;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.embulk.config.ConfigException;
import org.embulk.config.UserDataException;
import org.embulk.output.elasticsearch.ElasticsearchOutputPluginDelegate.AuthMethod;
import org.embulk.output.elasticsearch.ElasticsearchOutputPluginDelegate.NodeAddressTask;
import org.embulk.output.elasticsearch.ElasticsearchOutputPluginDelegate.PluginTask;
import org.embulk.spi.DataException;
import org.embulk.spi.Exec;
import org.embulk.spi.time.Timestamp;
+import org.embulk.util.retryhelper.jetty92.Jetty92ClientCreator;
import org.embulk.util.retryhelper.jetty92.Jetty92RetryHelper;
import org.embulk.util.retryhelper.jetty92.Jetty92SingleRequester;
import org.embulk.util.retryhelper.jetty92.StringJetty92ResponseEntityReader;
import org.slf4j.Logger;
@@ -52,11 +54,11 @@
public ElasticsearchHttpClient()
{
this.log = Exec.getLogger(getClass());
}
- public void push(JsonNode records, PluginTask task, Jetty92RetryHelper retryHelper)
+ public void push(JsonNode records, PluginTask task)
{
int bulkActions = task.getBulkActions();
long bulkSize = task.getBulkSize();
// curl -xPOST localhost:9200/{index}/{type}/_bulk -d '
// {"index" : {}}\n
@@ -76,42 +78,42 @@
String requestString = jsonMapper.writeValueAsString(record);
sb.append("\n")
.append(requestString)
.append("\n");
}
- sendRequest(path, HttpMethod.POST, task, retryHelper, sb.toString());
+ sendRequest(path, HttpMethod.POST, task, sb.toString());
}
}
catch (JsonProcessingException ex) {
throw new DataException(ex);
}
}
- public List<String> getIndexByAlias(String aliasName, PluginTask task, Jetty92RetryHelper retryHelper)
+ public List<String> getIndexByAlias(String aliasName, PluginTask task)
{
// curl -XGET localhost:9200/_alias/{alias}
// No alias: 404
// Alias found: {"embulk_20161018-183738":{"aliases":{"embulk":{}}}}
List<String> indices = new ArrayList<>();
String path = String.format("/_alias/%s", aliasName);
- JsonNode response = sendRequest(path, HttpMethod.GET, task, retryHelper);
+ JsonNode response = sendRequest(path, HttpMethod.GET, task);
Iterator it = response.fieldNames();
while (it.hasNext()) {
indices.add(it.next().toString());
}
return indices;
}
- public boolean isIndexExisting(String indexName, PluginTask task, Jetty92RetryHelper retryHelper)
+ public boolean isIndexExisting(String indexName, PluginTask task)
{
// curl -XGET localhost:9200/{index}
// No index: 404
// Index found: 200
try {
- sendRequest(indexName, HttpMethod.GET, task, retryHelper);
+ sendRequest(indexName, HttpMethod.GET, task);
return true;
}
catch (ResourceNotFoundException ex) {
return false;
}
@@ -121,16 +123,16 @@
{
Timestamp time = Exec.getTransactionTime();
return indexName + new SimpleDateFormat("_yyyyMMdd-HHmmss").format(time.toEpochMilli());
}
- public boolean isAliasExisting(String aliasName, PluginTask task, Jetty92RetryHelper retryHelper)
+ public boolean isAliasExisting(String aliasName, PluginTask task)
{
// curl -XGET localhost:9200/_aliases // List all aliases
// No aliases: {}
// Aliases found: {"embulk_20161018-183738":{"aliases":{"embulk":{}}}}
- JsonNode response = sendRequest("/_aliases", HttpMethod.GET, task, retryHelper);
+ JsonNode response = sendRequest("/_aliases", HttpMethod.GET, task);
if (response.size() == 0) {
return false;
}
for (JsonNode index : response) {
if (index.has("aliases") && index.get("aliases").has(aliasName)) {
@@ -141,28 +143,28 @@
}
// Should be called just once while Embulk transaction.
// Be sure to call after all exporting tasks completed
// This method will delete existing index
- public void reassignAlias(String aliasName, String newIndexName, PluginTask task, Jetty92RetryHelper retryHelper)
+ public void reassignAlias(String aliasName, String newIndexName, PluginTask task)
{
- if (!isAliasExisting(aliasName, task, retryHelper)) {
- assignAlias(newIndexName, aliasName, task, retryHelper);
+ if (!isAliasExisting(aliasName, task)) {
+ assignAlias(newIndexName, aliasName, task);
}
else {
- List<String> oldIndices = getIndexByAlias(aliasName, task, retryHelper);
- assignAlias(newIndexName, aliasName, task, retryHelper);
+ List<String> oldIndices = getIndexByAlias(aliasName, task);
+ assignAlias(newIndexName, aliasName, task);
for (String index : oldIndices) {
- deleteIndex(index, task, retryHelper);
+ deleteIndex(index, task);
}
}
}
- public String getEsVersion(PluginTask task, Jetty92RetryHelper retryHelper)
+ public String getEsVersion(PluginTask task)
{
// curl -XGET 'http://localhost:9200’
- JsonNode response = sendRequest("", HttpMethod.GET, task, retryHelper);
+ JsonNode response = sendRequest("", HttpMethod.GET, task);
return response.get("version").get("number").asText();
}
public void validateIndexOrAliasName(String index, String type)
{
@@ -206,22 +208,22 @@
// {"index" : {}}
return "{\"index\" : {}}";
}
}
- private void assignAlias(String indexName, String aliasName, PluginTask task, Jetty92RetryHelper retryHelper)
+ private void assignAlias(String indexName, String aliasName, PluginTask task)
{
try {
- if (isIndexExisting(indexName, task, retryHelper)) {
- if (isAliasExisting(aliasName, task, retryHelper)) {
+ if (isIndexExisting(indexName, task)) {
+ if (isAliasExisting(aliasName, task)) {
// curl -XPUT http://localhost:9200/_alias -d\
// "actions" : [
// {"remove" : {"alias" : "{alias}", "index" : "{index_old}"}},
// {"add" : {"alias": "{alias}", "index": "{index_new}"}}
// ]
// Success: {"acknowledged":true}
- List<String> oldIndices = getIndexByAlias(aliasName, task, retryHelper);
+ List<String> oldIndices = getIndexByAlias(aliasName, task);
Map<String, String> newAlias = new HashMap<>();
newAlias.put("alias", aliasName);
newAlias.put("index", indexName);
Map<String, Map> add = new HashMap<>();
@@ -241,45 +243,45 @@
actions.add(add);
Map<String, List> rootTree = new HashMap<>();
rootTree.put("actions", actions);
String content = jsonMapper.writeValueAsString(rootTree);
- sendRequest("/_aliases", HttpMethod.POST, task, retryHelper, content);
+ sendRequest("/_aliases", HttpMethod.POST, task, content);
log.info("Reassigned alias [{}] to index[{}]", aliasName, indexName);
}
else {
// curl -XPUT http://localhost:9200/{index}/_alias/{alias}
// Success: {"acknowledged":true}
String path = String.format("/%s/_alias/%s", indexName, aliasName);
- sendRequest(path, HttpMethod.PUT, task, retryHelper);
+ sendRequest(path, HttpMethod.PUT, task);
log.info("Assigned alias [{}] to Index [{}]", aliasName, indexName);
}
}
}
catch (JsonProcessingException ex) {
throw new ConfigException(String.format("Failed to assign alias[%s] to index[%s]", aliasName, indexName));
}
}
- private void deleteIndex(String indexName, PluginTask task, Jetty92RetryHelper retryHelper)
+ private void deleteIndex(String indexName, PluginTask task)
{
// curl -XDELETE localhost:9200/{index}
// Success: {"acknowledged":true}
- if (isIndexExisting(indexName, task, retryHelper)) {
- waitSnapshot(task, retryHelper);
- sendRequest(indexName, HttpMethod.DELETE, task, retryHelper);
+ if (isIndexExisting(indexName, task)) {
+ waitSnapshot(task);
+ sendRequest(indexName, HttpMethod.DELETE, task);
log.info("Deleted Index [{}]", indexName);
}
}
- private void waitSnapshot(PluginTask task, Jetty92RetryHelper retryHelper)
+ private void waitSnapshot(PluginTask task)
{
int maxSnapshotWaitingMills = task.getMaxSnapshotWaitingSecs() * 1000;
long execCount = 1;
long totalWaitingTime = 0;
// Since only needs exponential backoff, don't need exception handling and others, I don't use Embulk RetryExecutor
- while (isSnapshotProgressing(task, retryHelper)) {
+ while (isSnapshotProgressing(task)) {
long sleepTime = ((long) Math.pow(2, execCount) * 1000);
try {
Thread.sleep(sleepTime);
}
catch (InterruptedException ex) {
@@ -294,69 +296,71 @@
throw new ConfigException(String.format("Waiting creating snapshot is expired. %s sec.", maxSnapshotWaitingMills));
}
}
}
- private boolean isSnapshotProgressing(PluginTask task, Jetty92RetryHelper retryHelper)
+ private boolean isSnapshotProgressing(PluginTask task)
{
// https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html#_snapshot_status
// curl -XGET localhost:9200/_snapshot/_status
- JsonNode response = sendRequest("/_snapshot/_status", HttpMethod.GET, task, retryHelper);
+ JsonNode response = sendRequest("/_snapshot/_status", HttpMethod.GET, task);
String snapshots = response.get("snapshots").asText();
return !snapshots.equals("");
}
- private JsonNode sendRequest(String path, final HttpMethod method, PluginTask task, Jetty92RetryHelper retryHelper)
+ private JsonNode sendRequest(String path, final HttpMethod method, PluginTask task)
{
- return sendRequest(path, method, task, retryHelper, "");
+ return sendRequest(path, method, task, "");
}
- private JsonNode sendRequest(String path, final HttpMethod method, final PluginTask task, Jetty92RetryHelper retryHelper, final String content)
+ private JsonNode sendRequest(String path, final HttpMethod method, final PluginTask task, final String content)
{
final String uri = createRequestUri(task, path);
final String authorizationHeader = getAuthorizationHeader(task);
- String responseBody = retryHelper.requestWithRetry(
- new StringJetty92ResponseEntityReader(task.getTimeoutMills()),
- new Jetty92SingleRequester() {
- @Override
- public void requestOnce(org.eclipse.jetty.client.HttpClient client, org.eclipse.jetty.client.api.Response.Listener responseListener)
- {
- org.eclipse.jetty.client.api.Request request = client
- .newRequest(uri)
- .accept("application/json")
- .method(method);
- if (method == HttpMethod.POST) {
- request.content(new StringContentProvider(content), "application/json");
+ try (Jetty92RetryHelper retryHelper = createRetryHelper(task)) {
+ String responseBody = retryHelper.requestWithRetry(
+ new StringJetty92ResponseEntityReader(task.getTimeoutMills()),
+ new Jetty92SingleRequester() {
+ @Override
+ public void requestOnce(org.eclipse.jetty.client.HttpClient client, org.eclipse.jetty.client.api.Response.Listener responseListener)
+ {
+ org.eclipse.jetty.client.api.Request request = client
+ .newRequest(uri)
+ .accept("application/json")
+ .method(method);
+ if (method == HttpMethod.POST) {
+ request.content(new StringContentProvider(content), "application/json");
+ }
+
+ if (!authorizationHeader.isEmpty()) {
+ request.header("Authorization", authorizationHeader);
+ }
+ request.send(responseListener);
}
- if (!authorizationHeader.isEmpty()) {
- request.header("Authorization", authorizationHeader);
+ @Override
+ public boolean isExceptionToRetry(Exception exception)
+ {
+ return task.getId().isPresent();
}
- request.send(responseListener);
- }
- @Override
- public boolean isExceptionToRetry(Exception exception)
- {
- return task.getId().isPresent();
- }
-
- @Override
- public boolean isResponseStatusToRetry(org.eclipse.jetty.client.api.Response response)
- {
- int status = response.getStatus();
- if (status == 404) {
- throw new ResourceNotFoundException("Requested resource was not found");
+ @Override
+ public boolean isResponseStatusToRetry(org.eclipse.jetty.client.api.Response response)
+ {
+ int status = response.getStatus();
+ if (status == 404) {
+ throw new ResourceNotFoundException("Requested resource was not found");
+ }
+ else if (status == 429) {
+ return true; // Retry if 429.
+ }
+ return status / 100 != 4; // Retry unless 4xx except for 429.
}
- else if (status == 429) {
- return true; // Retry if 429.
- }
- return status / 100 != 4; // Retry unless 4xx except for 429.
- }
- });
- return parseJson(responseBody);
+ });
+ return parseJson(responseBody);
+ }
}
private String createRequestUri(PluginTask task, String path)
{
if (!path.startsWith("/")) {
@@ -383,9 +387,31 @@
return this.jsonMapper.readTree(json);
}
catch (IOException ex) {
throw new DataException(ex);
}
+ }
+
+ private Jetty92RetryHelper createRetryHelper(PluginTask task)
+ {
+ return new Jetty92RetryHelper(
+ task.getMaximumRetries(),
+ task.getInitialRetryIntervalMillis(),
+ task.getMaximumRetryIntervalMillis(),
+ new Jetty92ClientCreator() {
+ @Override
+ public org.eclipse.jetty.client.HttpClient createAndStart()
+ {
+ org.eclipse.jetty.client.HttpClient client = new org.eclipse.jetty.client.HttpClient(new SslContextFactory());
+ try {
+ client.start();
+ return client;
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+ });
}
@VisibleForTesting
protected String getAuthorizationHeader(PluginTask task)
{