src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingRecordWriter.java in wonderdog-0.1.1 vs src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingRecordWriter.java in wonderdog-0.2.0
- old
+ new
@@ -18,12 +18,17 @@
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.ExceptionsHelper;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.JsonParseException;
@@ -34,10 +39,11 @@
private String defaultIndexName;
private String defaultMappingName;
private String indexFieldName;
private String mappingFieldName;
private String idFieldName;
+ private String routingFieldName;
private Integer bulkSize;
// Bookkeeping
private AtomicLong totalBulkTime = new AtomicLong();
private AtomicLong totalBulkItems = new AtomicLong();
@@ -46,61 +52,86 @@
// Elasticsearch indexing
private Node node;
private Client client;
private volatile BulkRequestBuilder currentRequest;
+ private boolean transport;
+ private String transportHost;
+ private Integer transportPort;
// JSON parsing
private ObjectMapper mapper;
//
// == Lifecycle ==
//
- public ElasticSearchStreamingRecordWriter(String defaultIndexName, String defaultMappingName, String indexFieldName, String mappingFieldName, String idFieldName, Integer bulkSize) {
- this.defaultIndexName = defaultIndexName;
- this.defaultMappingName = defaultMappingName;
- this.indexFieldName = indexFieldName;
- this.mappingFieldName = mappingFieldName;
- this.idFieldName = idFieldName;
- this.bulkSize = bulkSize;
+ public ElasticSearchStreamingRecordWriter(String defaultIndexName, String defaultMappingName, String indexFieldName, String mappingFieldName, String idFieldName, String routingFieldName, Integer bulkSize, boolean transport, String transportHost, Integer transportPort) {
+ this.defaultIndexName = defaultIndexName;
+ this.defaultMappingName = defaultMappingName;
+ this.indexFieldName = indexFieldName;
+ this.mappingFieldName = mappingFieldName;
+ this.idFieldName = idFieldName;
+ this.routingFieldName = routingFieldName;
+ this.bulkSize = bulkSize;
+ this.transport = transport;
+ this.transportHost = transportHost;
+ this.transportPort = transportPort;
LOG.info("Writing "+Integer.toString(bulkSize)+" records per batch");
LOG.info("Using default target /"+defaultIndexName+"/"+defaultMappingName);
LOG.info("Records override default target with index field '"+indexFieldName+"', mapping field '"+mappingFieldName+"', and ID field '"+idFieldName);
-
- startEmbeddedClient();
+ if (transport) {
+ this.client = buildTransportClient();
+ } else {
+ startNode();
+ this.client = node.client();
+ }
this.currentRequest = client.prepareBulk();
this.mapper = new ObjectMapper();
}
/**
- Start an embedded Elasticsearch client. The client will not be
- a data node and will not store data locally.
+ Build a transport client that will connect to some
+ Elasticsearch node.
+
+ */
+ private Client buildTransportClient() {
+ LOG.info("Connecting transport client to "+transportHost+":"+Integer.toString(transportPort));
+ Settings settings = ImmutableSettings.settingsBuilder().put("client.transport.ignore_cluster_name", "true").build();
+ return new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress(transportHost, transportPort));
+ }
- The client will connect to the target Elasticsearch cluster as
- a client node, enabling one-hop writes for all data. See
- http://www.elasticsearch.org/guide/reference/java-api/client.html
+ /**
+ Start an embedded Elasticsearch node.
+
+ The node will not store any data locally (non-datanode) but
+ will connect to a cluster using the default Elasticsearch
+ settings (those available in
+ /etc/elasticsearch/elasticsearch.yml).
*/
- private void startEmbeddedClient() {
+ private void startNode() {
LOG.info("Starting embedded Elasticsearch client (non-datanode)...");
this.node = NodeBuilder.nodeBuilder().client(true).node();
- this.client = node.client();
LOG.info("Successfully joined Elasticsearch cluster '"+ClusterName.clusterNameFromSettings(node.settings())+'"');
}
/**
Close the Elasticsearch client, sending out one last bulk write
if necessary.
*/
public void close(Reporter reporter) throws IOException {
sendBulkRequestIfMoreThan(0);
- LOG.info("Shutting down Elasticsearch client...");
- if (client != null) client.close();
- if (node != null) node.close();
- LOG.info("Successfully shut down Elasticsearch client");
+ if (client != null) {
+ LOG.info("Shutting down Elasticsearch client...");
+ client.close();
+ }
+ if (node != null) {
+ LOG.info("Shutting down Elasticsearch node...");
+ node.close();
+ }
}
//
// == Writing records ==
//
@@ -120,15 +151,21 @@
}
}
private void index(String json) throws IOException {
Map<String, Object> record = mapper.readValue(json, Map.class);
+ IndexRequest request = null;
if (record.containsKey(idFieldName)) {
Object idValue = record.get(idFieldName);
- currentRequest.add(Requests.indexRequest(indexNameForRecord(record)).id(String.valueOf(idValue)).type(mappingNameForRecord(record)).create(false).source(json));
+ request = Requests.indexRequest(indexNameForRecord(record)).id(String.valueOf(idValue)).type(mappingNameForRecord(record)).create(false).source(json);
} else {
- currentRequest.add(Requests.indexRequest(indexNameForRecord(record)).type(mappingNameForRecord(record)).source(json));
+ request = Requests.indexRequest(indexNameForRecord(record)).type(mappingNameForRecord(record)).source(json);
}
+ if (record.containsKey(routingFieldName)) {
+ Object routingValue = record.get(routingFieldName);
+ request.routing(String.valueOf(routingValue));
+ }
+ currentRequest.add(request);
}
private String indexNameForRecord(Map<String, Object> record) {
if (record.containsKey(indexFieldName)) {
Object indexValue = record.get(indexFieldName);