src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingRecordReader.java in wonderdog-0.1.1 vs src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingRecordReader.java in wonderdog-0.2.0
- old
+ new
@@ -15,10 +15,14 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.client.Client;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchScrollRequestBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.Scroll;
@@ -40,46 +44,83 @@
private Scroll scroll;
private Node node;
private Client client;
private ElasticSearchStreamingSplit split;
-
+ private boolean transport;
+ private String transportHost;
+ private Integer transportPort;
+
private String scrollId;
private Integer recordsRead;
private Iterator<SearchHit> hitsItr = null;
- public ElasticSearchStreamingRecordReader(InputSplit split, JobConf conf) {
+ public ElasticSearchStreamingRecordReader(InputSplit split, JobConf conf, boolean transport, String transportHost, Integer transportPort) {
this.split = (ElasticSearchStreamingSplit) split;
this.recordsRead = 0;
this.requestSize = Integer.parseInt(conf.get(ES_REQUEST_SIZE_OPT, ES_REQUEST_SIZE));
this.scrollTimeout = conf.get(ES_SCROLL_TIMEOUT_OPT, ES_SCROLL_TIMEOUT);
this.scroll = new Scroll(TimeValue.parseTimeValue(this.scrollTimeout, defaultScrollTimeout));
+
+ this.transport = transport;
+ this.transportHost = transportHost;
+ this.transportPort = transportPort;
LOG.info("Initializing "+this.split.getSummary());
- startEmbeddedClient();
+ if (transport) {
+ this.client = buildTransportClient();
+ } else {
+ startNode();
+ this.client = node.client();
+ }
fetchNextHits();
}
+ /**
+ Build a transport client that will connect to some
+ Elasticsearch node.
+
+ */
+ private Client buildTransportClient() {
+ LOG.info("Connecting transport client to "+transportHost+":"+Integer.toString(transportPort));
+ Settings settings = ImmutableSettings.settingsBuilder().put("client.transport.ignore_cluster_name", "true").build();
+ return new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress(transportHost, transportPort));
+ }
+
+ /**
+ Start an embedded Elasticsearch node.
+
+ The node will not store any data locally (non-datanode) but
+ will connect to a cluster using the default Elasticsearch
+ settings (those available in
+ /etc/elasticsearch/elasticsearch.yml).
+ */
+ private void startNode() {
+ LOG.info("Starting embedded Elasticsearch client (non-datanode)...");
+ this.node = NodeBuilder.nodeBuilder().client(true).node();
+ LOG.info("Successfully joined Elasticsearch cluster '"+ClusterName.clusterNameFromSettings(node.settings())+'"');
+ }
+
private void fetchNextHits() {
if (scrollId == null) {
LOG.info("Running initial scroll with timeout "+scrollTimeout);
SearchRequestBuilder request = split.initialScrollRequest(client, scroll, requestSize);
SearchResponse response = request.execute().actionGet();
- this.scrollId = response.scrollId();
+ this.scrollId = response.getScrollId();
LOG.info("Got scroll ID "+scrollId);
// Do we need to call fetchNextHits() again here? Or does
// the initial request also itself contain the first set
// of hits for the scroll?
//
// fetchNextHits();
} else {
// LOG.info("Running query for scroll ID "+scrollId+" with timeout "+scrollTimeout);
SearchScrollRequestBuilder request = split.scrollRequest(client, scroll, scrollId);
SearchResponse response = request.execute().actionGet();
- this.scrollId = response.scrollId();
+ this.scrollId = response.getScrollId();
// LOG.info("Got scroll ID "+scrollId);
- this.hitsItr = response.hits().iterator();
+ this.hitsItr = response.getHits().iterator();
}
}
@Override
public boolean next(K key, V value) throws IOException {
@@ -149,28 +190,16 @@
return ((float) recordsRead) / ((float) split.getSize());
}
@Override
public void close() throws IOException {
- stopEmbeddedClient();
+ if (client != null) {
+ LOG.info("Shutting down Elasticsearch client...");
+ client.close();
+ }
+ if (node != null) {
+ LOG.info("Shutting down Elasticsearch node...");
+ node.close();
+ }
}
-
- //
- // == Connecting to Elasticsearch ==
- //
-
- private void startEmbeddedClient() {
- LOG.info("Starting embedded Elasticsearch client (non-datanode)...");
- this.node = NodeBuilder.nodeBuilder().client(true).node();
- this.client = node.client();
- LOG.info("Successfully joined Elasticsearch cluster '"+ClusterName.clusterNameFromSettings(node.settings())+'"');
- }
-
- private void stopEmbeddedClient() {
- LOG.info("Stopping embedded Elasticsearch client...");
- if (client != null) client.close();
- if (node != null) node.close();
- LOG.info("Left Elasticsearch cluster");
- }
-
}