src/main/java/org/embulk/output/elasticsearch/ElasticsearchRecordBuffer.java in embulk-output-elasticsearch-0.4.3 vs src/main/java/org/embulk/output/elasticsearch/ElasticsearchRecordBuffer.java in embulk-output-elasticsearch-0.4.4

- old
+ new

@@ -10,11 +10,10 @@ import org.embulk.base.restclient.record.RecordBuffer; import org.embulk.base.restclient.record.ServiceRecord; import org.embulk.config.TaskReport; import org.embulk.output.elasticsearch.ElasticsearchOutputPluginDelegate.PluginTask; import org.embulk.spi.Exec; -import org.embulk.util.retryhelper.jetty92.Jetty92RetryHelper; import org.slf4j.Logger; import java.io.IOException; /** @@ -26,26 +25,24 @@ private final String attributeName; private final PluginTask task; private final long bulkActions; private final long bulkSize; private final ElasticsearchHttpClient client; - private final Jetty92RetryHelper retryHelper; private final ObjectMapper mapper; private final Logger log; private long totalCount; private int requestCount; private long requestBytes; private ArrayNode records; - public ElasticsearchRecordBuffer(String attributeName, PluginTask task, Jetty92RetryHelper retryHelper) + public ElasticsearchRecordBuffer(String attributeName, PluginTask task) { this.attributeName = attributeName; this.task = task; this.bulkActions = task.getBulkActions(); this.bulkSize = task.getBulkSize(); this.client = new ElasticsearchHttpClient(); - this.retryHelper = retryHelper; this.mapper = new ObjectMapper() .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) .configure(com.fasterxml.jackson.core.JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, false); this.records = JsonNodeFactory.instance.arrayNode(); this.totalCount = 0; @@ -66,11 +63,11 @@ totalCount++; requestBytes += record.toString().getBytes().length; records.add(record); if (requestCount >= bulkActions || requestBytes >= bulkSize) { - client.push(records, task, retryHelper); + client.push(records, task); if (totalCount % 10000 == 0) { log.info("Inserted {} records", totalCount); } records = JsonNodeFactory.instance.arrayNode(); requestBytes = 0; @@ -84,19 +81,24 @@ throw Throwables.propagate(ex); } } @Override + public void finish() + { + } + + @Override + public void close() + { + } + + @Override public TaskReport commitWithTaskReportUpdated(TaskReport taskReport) { - try { - if (records.size() > 0) { - client.push(records, task, retryHelper); - log.info("Inserted {} records", records.size()); - } - return Exec.newTaskReport().set("inserted", totalCount); + if (records.size() > 0) { + client.push(records, task); + log.info("Inserted {} records", records.size()); } - finally { - this.retryHelper.close(); - } + return Exec.newTaskReport().set("inserted", totalCount); } }