src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingRecordWriter.java in wonderdog-0.1.1 vs src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingRecordWriter.java in wonderdog-0.2.0

- old
+ new

@@ -18,12 +18,17 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.node.Node; import org.elasticsearch.node.NodeBuilder; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.ExceptionsHelper; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.JsonParseException; @@ -34,10 +39,11 @@ private String defaultIndexName; private String defaultMappingName; private String indexFieldName; private String mappingFieldName; private String idFieldName; + private String routingFieldName; private Integer bulkSize; // Bookkeeping private AtomicLong totalBulkTime = new AtomicLong(); private AtomicLong totalBulkItems = new AtomicLong(); @@ -46,61 +52,86 @@ // Elasticsearch indexing private Node node; private Client client; private volatile BulkRequestBuilder currentRequest; + private boolean transport; + private String transportHost; + private Integer transportPort; // JSON parsing private ObjectMapper mapper; // // == Lifecycle == // - public ElasticSearchStreamingRecordWriter(String defaultIndexName, String defaultMappingName, String indexFieldName, String mappingFieldName, String idFieldName, Integer bulkSize) { - this.defaultIndexName = defaultIndexName; - this.defaultMappingName = defaultMappingName; - this.indexFieldName = indexFieldName; - this.mappingFieldName = mappingFieldName; - this.idFieldName = idFieldName; - this.bulkSize = bulkSize; + public ElasticSearchStreamingRecordWriter(String defaultIndexName, String defaultMappingName, String indexFieldName, String mappingFieldName, String idFieldName, String routingFieldName, Integer bulkSize, boolean transport, String transportHost, Integer transportPort) { + this.defaultIndexName = defaultIndexName; + this.defaultMappingName = defaultMappingName; + this.indexFieldName = indexFieldName; + this.mappingFieldName = mappingFieldName; + this.idFieldName = idFieldName; + this.routingFieldName = routingFieldName; + this.bulkSize = bulkSize; + this.transport = transport; + this.transportHost = transportHost; + this.transportPort = transportPort; LOG.info("Writing "+Integer.toString(bulkSize)+" records per batch"); LOG.info("Using default target /"+defaultIndexName+"/"+defaultMappingName); LOG.info("Records override default target with index field '"+indexFieldName+"', mapping field '"+mappingFieldName+"', and ID field '"+idFieldName); - - startEmbeddedClient(); + if (transport) { + this.client = buildTransportClient(); + } else { + startNode(); + this.client = node.client(); + } this.currentRequest = client.prepareBulk(); this.mapper = new ObjectMapper(); } /** - Start an embedded Elasticsearch client. The client will not be - a data node and will not store data locally. + Build a transport client that will connect to some + Elasticsearch node. + + */ + private Client buildTransportClient() { + LOG.info("Connecting transport client to "+transportHost+":"+Integer.toString(transportPort)); + Settings settings = ImmutableSettings.settingsBuilder().put("client.transport.ignore_cluster_name", "true").build(); + return new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress(transportHost, transportPort)); + } - The client will connect to the target Elasticsearch cluster as - a client node, enabling one-hop writes for all data. See - http://www.elasticsearch.org/guide/reference/java-api/client.html + /** + Start an embedded Elasticsearch node. + + The node will not store any data locally (non-datanode) but + will connect to a cluster using the default Elasticsearch + settings (those available in + /etc/elasticsearch/elasticsearch.yml). */ - private void startEmbeddedClient() { + private void startNode() { LOG.info("Starting embedded Elasticsearch client (non-datanode)..."); this.node = NodeBuilder.nodeBuilder().client(true).node(); - this.client = node.client(); LOG.info("Successfully joined Elasticsearch cluster '"+ClusterName.clusterNameFromSettings(node.settings())+'"'); } /** Close the Elasticsearch client, sending out one last bulk write if necessary. */ public void close(Reporter reporter) throws IOException { sendBulkRequestIfMoreThan(0); - LOG.info("Shutting down Elasticsearch client..."); - if (client != null) client.close(); - if (node != null) node.close(); - LOG.info("Successfully shut down Elasticsearch client"); + if (client != null) { + LOG.info("Shutting down Elasticsearch client..."); + client.close(); + } + if (node != null) { + LOG.info("Shutting down Elasticsearch node..."); + node.close(); + } } // // == Writing records == // @@ -120,15 +151,21 @@ } } private void index(String json) throws IOException { Map<String, Object> record = mapper.readValue(json, Map.class); + IndexRequest request = null; if (record.containsKey(idFieldName)) { Object idValue = record.get(idFieldName); - currentRequest.add(Requests.indexRequest(indexNameForRecord(record)).id(String.valueOf(idValue)).type(mappingNameForRecord(record)).create(false).source(json)); + request = Requests.indexRequest(indexNameForRecord(record)).id(String.valueOf(idValue)).type(mappingNameForRecord(record)).create(false).source(json); } else { - currentRequest.add(Requests.indexRequest(indexNameForRecord(record)).type(mappingNameForRecord(record)).source(json)); + request = Requests.indexRequest(indexNameForRecord(record)).type(mappingNameForRecord(record)).source(json); } + if (record.containsKey(routingFieldName)) { + Object routingValue = record.get(routingFieldName); + request.routing(String.valueOf(routingValue)); + } + currentRequest.add(request); } private String indexNameForRecord(Map<String, Object> record) { if (record.containsKey(indexFieldName)) { Object indexValue = record.get(indexFieldName);