src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingRecordReader.java in wonderdog-0.1.1 vs src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingRecordReader.java in wonderdog-0.2.0

- old
+ new

@@ -15,10 +15,14 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.node.Node; import org.elasticsearch.node.NodeBuilder; import org.elasticsearch.client.Client; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchScrollRequestBuilder; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.Scroll; @@ -40,46 +44,83 @@ private Scroll scroll; private Node node; private Client client; private ElasticSearchStreamingSplit split; - + private boolean transport; + private String transportHost; + private Integer transportPort; + private String scrollId; private Integer recordsRead; private Iterator<SearchHit> hitsItr = null; - public ElasticSearchStreamingRecordReader(InputSplit split, JobConf conf) { + public ElasticSearchStreamingRecordReader(InputSplit split, JobConf conf, boolean transport, String transportHost, Integer transportPort) { this.split = (ElasticSearchStreamingSplit) split; this.recordsRead = 0; this.requestSize = Integer.parseInt(conf.get(ES_REQUEST_SIZE_OPT, ES_REQUEST_SIZE)); this.scrollTimeout = conf.get(ES_SCROLL_TIMEOUT_OPT, ES_SCROLL_TIMEOUT); this.scroll = new Scroll(TimeValue.parseTimeValue(this.scrollTimeout, defaultScrollTimeout)); + + this.transport = transport; + this.transportHost = transportHost; + this.transportPort = transportPort; LOG.info("Initializing "+this.split.getSummary()); - startEmbeddedClient(); + if (transport) { + this.client = buildTransportClient(); + } else { + startNode(); + this.client = node.client(); + } fetchNextHits(); } + /** + Build a transport client that will connect to some + Elasticsearch node. + + */ + private Client buildTransportClient() { + LOG.info("Connecting transport client to "+transportHost+":"+Integer.toString(transportPort)); + Settings settings = ImmutableSettings.settingsBuilder().put("client.transport.ignore_cluster_name", "true").build(); + return new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress(transportHost, transportPort)); + } + + /** + Start an embedded Elasticsearch node. + + The node will not store any data locally (non-datanode) but + will connect to a cluster using the default Elasticsearch + settings (those available in + /etc/elasticsearch/elasticsearch.yml). + */ + private void startNode() { + LOG.info("Starting embedded Elasticsearch client (non-datanode)..."); + this.node = NodeBuilder.nodeBuilder().client(true).node(); + LOG.info("Successfully joined Elasticsearch cluster '"+ClusterName.clusterNameFromSettings(node.settings())+'"'); + } + private void fetchNextHits() { if (scrollId == null) { LOG.info("Running initial scroll with timeout "+scrollTimeout); SearchRequestBuilder request = split.initialScrollRequest(client, scroll, requestSize); SearchResponse response = request.execute().actionGet(); - this.scrollId = response.scrollId(); + this.scrollId = response.getScrollId(); LOG.info("Got scroll ID "+scrollId); // Do we need to call fetchNextHits() again here? Or does // the initial request also itself contain the first set // of hits for the scroll? // // fetchNextHits(); } else { // LOG.info("Running query for scroll ID "+scrollId+" with timeout "+scrollTimeout); SearchScrollRequestBuilder request = split.scrollRequest(client, scroll, scrollId); SearchResponse response = request.execute().actionGet(); - this.scrollId = response.scrollId(); + this.scrollId = response.getScrollId(); // LOG.info("Got scroll ID "+scrollId); - this.hitsItr = response.hits().iterator(); + this.hitsItr = response.getHits().iterator(); } } @Override public boolean next(K key, V value) throws IOException { @@ -149,28 +190,16 @@ return ((float) recordsRead) / ((float) split.getSize()); } @Override public void close() throws IOException { - stopEmbeddedClient(); + if (client != null) { + LOG.info("Shutting down Elasticsearch client..."); + client.close(); + } + if (node != null) { + LOG.info("Shutting down Elasticsearch node..."); + node.close(); + } } - - // - // == Connecting to Elasticsearch == - // - - private void startEmbeddedClient() { - LOG.info("Starting embedded Elasticsearch client (non-datanode)..."); - this.node = NodeBuilder.nodeBuilder().client(true).node(); - this.client = node.client(); - LOG.info("Successfully joined Elasticsearch cluster '"+ClusterName.clusterNameFromSettings(node.settings())+'"'); - } - - private void stopEmbeddedClient() { - LOG.info("Stopping embedded Elasticsearch client..."); - if (client != null) client.close(); - if (node != null) node.close(); - LOG.info("Left Elasticsearch cluster"); - } - }