src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingInputFormat.java in wonderdog-0.1.1 vs src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingInputFormat.java in wonderdog-0.2.0

- old
+ new

@@ -61,16 +61,28 @@ private static final String ES_PLUGINS_OPT = "es.path.plugins"; private static final String ES_PLUGINS = "/usr/local/share/elasticsearch/plugins"; private static final String ES_UNICAST_HOSTS_NAME = "discovery.zen.ping.unicast.hosts"; + + private static final String ES_TRANSPORT_OPT = "elasticsearch.transport"; + private static final String ES_TRANSPORT = "false"; + + private static final String ES_TRANSPORT_HOST_OPT = "elasticsearch.transport.host"; + private static final String ES_TRANSPORT_HOST = "localhost"; + + private static final String ES_TRANSPORT_PORT_OPT = "elasticsearch.transport.port"; + private static final String ES_TRANSPORT_PORT = "9300"; private TransportClient client; public RecordReader<K, V> getRecordReader(InputSplit split, JobConf conf, Reporter reporter) { setLocalElasticSearchInstallation(conf); - return (RecordReader) new ElasticSearchStreamingRecordReader(split, conf); + boolean esTransport = new Boolean(conf.get(ES_TRANSPORT_OPT, ES_TRANSPORT)); + String esTransportHost = conf.get(ES_TRANSPORT_HOST_OPT, ES_TRANSPORT_HOST); + Integer esTransportPort = Integer.parseInt(conf.get(ES_TRANSPORT_PORT_OPT, ES_TRANSPORT_PORT)); + return (RecordReader) new ElasticSearchStreamingRecordReader(split, conf, esTransport, esTransportHost, esTransportPort); } public InputSplit[] getSplits(JobConf conf, int requestedNumSplits) { this.numSplits = requestedNumSplits; @@ -183,10 +195,10 @@ request.setSearchType(SearchType.COUNT); if (queryJSON != null && queryJSON.length() > 0) { request.setQuery(queryJSON); } SearchResponse response = request.execute().actionGet(); - this.numHits = response.hits().totalHits(); + this.numHits = response.getHits().totalHits(); LOG.info("Ran query: "+String.valueOf(numHits)+" hits"); } //