src/main/java/org/embulk/output/elasticsearch/ElasticsearchRecordBuffer.java in embulk-output-elasticsearch-0.4.3 vs src/main/java/org/embulk/output/elasticsearch/ElasticsearchRecordBuffer.java in embulk-output-elasticsearch-0.4.4
- old
+ new
@@ -10,11 +10,10 @@
import org.embulk.base.restclient.record.RecordBuffer;
import org.embulk.base.restclient.record.ServiceRecord;
import org.embulk.config.TaskReport;
import org.embulk.output.elasticsearch.ElasticsearchOutputPluginDelegate.PluginTask;
import org.embulk.spi.Exec;
-import org.embulk.util.retryhelper.jetty92.Jetty92RetryHelper;
import org.slf4j.Logger;
import java.io.IOException;
/**
@@ -26,26 +25,24 @@
private final String attributeName;
private final PluginTask task;
private final long bulkActions;
private final long bulkSize;
private final ElasticsearchHttpClient client;
- private final Jetty92RetryHelper retryHelper;
private final ObjectMapper mapper;
private final Logger log;
private long totalCount;
private int requestCount;
private long requestBytes;
private ArrayNode records;
- public ElasticsearchRecordBuffer(String attributeName, PluginTask task, Jetty92RetryHelper retryHelper)
+ public ElasticsearchRecordBuffer(String attributeName, PluginTask task)
{
this.attributeName = attributeName;
this.task = task;
this.bulkActions = task.getBulkActions();
this.bulkSize = task.getBulkSize();
this.client = new ElasticsearchHttpClient();
- this.retryHelper = retryHelper;
this.mapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.configure(com.fasterxml.jackson.core.JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, false);
this.records = JsonNodeFactory.instance.arrayNode();
this.totalCount = 0;
@@ -66,11 +63,11 @@
totalCount++;
requestBytes += record.toString().getBytes().length;
records.add(record);
if (requestCount >= bulkActions || requestBytes >= bulkSize) {
- client.push(records, task, retryHelper);
+ client.push(records, task);
if (totalCount % 10000 == 0) {
log.info("Inserted {} records", totalCount);
}
records = JsonNodeFactory.instance.arrayNode();
requestBytes = 0;
@@ -84,19 +81,24 @@
throw Throwables.propagate(ex);
}
}
@Override
+ public void finish()
+ {
+ }
+
+ @Override
+ public void close()
+ {
+ }
+
+ @Override
public TaskReport commitWithTaskReportUpdated(TaskReport taskReport)
{
- try {
- if (records.size() > 0) {
- client.push(records, task, retryHelper);
- log.info("Inserted {} records", records.size());
- }
- return Exec.newTaskReport().set("inserted", totalCount);
+ if (records.size() > 0) {
+ client.push(records, task);
+ log.info("Inserted {} records", records.size());
}
- finally {
- this.retryHelper.close();
- }
+ return Exec.newTaskReport().set("inserted", totalCount);
}
}