src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingInputFormat.java in wonderdog-0.1.1 vs src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingInputFormat.java in wonderdog-0.2.0
- old
+ new
@@ -61,16 +61,28 @@
private static final String ES_PLUGINS_OPT = "es.path.plugins";
private static final String ES_PLUGINS = "/usr/local/share/elasticsearch/plugins";
private static final String ES_UNICAST_HOSTS_NAME = "discovery.zen.ping.unicast.hosts";
+
+ private static final String ES_TRANSPORT_OPT = "elasticsearch.transport";
+ private static final String ES_TRANSPORT = "false";
+
+ private static final String ES_TRANSPORT_HOST_OPT = "elasticsearch.transport.host";
+ private static final String ES_TRANSPORT_HOST = "localhost";
+
+ private static final String ES_TRANSPORT_PORT_OPT = "elasticsearch.transport.port";
+ private static final String ES_TRANSPORT_PORT = "9300";
private TransportClient client;
public RecordReader<K, V> getRecordReader(InputSplit split, JobConf conf, Reporter reporter) {
setLocalElasticSearchInstallation(conf);
- return (RecordReader) new ElasticSearchStreamingRecordReader(split, conf);
+ boolean esTransport = new Boolean(conf.get(ES_TRANSPORT_OPT, ES_TRANSPORT));
+ String esTransportHost = conf.get(ES_TRANSPORT_HOST_OPT, ES_TRANSPORT_HOST);
+ Integer esTransportPort = Integer.parseInt(conf.get(ES_TRANSPORT_PORT_OPT, ES_TRANSPORT_PORT));
+ return (RecordReader) new ElasticSearchStreamingRecordReader(split, conf, esTransport, esTransportHost, esTransportPort);
}
public InputSplit[] getSplits(JobConf conf, int requestedNumSplits) {
this.numSplits = requestedNumSplits;
@@ -183,10 +195,10 @@
request.setSearchType(SearchType.COUNT);
if (queryJSON != null && queryJSON.length() > 0) {
request.setQuery(queryJSON);
}
SearchResponse response = request.execute().actionGet();
- this.numHits = response.hits().totalHits();
+ this.numHits = response.getHits().totalHits();
LOG.info("Ran query: "+String.valueOf(numHits)+" hits");
}
//