src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingOutputFormat.java in wonderdog-0.1.1 vs src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingOutputFormat.java in wonderdog-0.2.0

- old
+ new

@@ -35,25 +35,32 @@ private static final String ES_DEFAULT_MAPPING = "streaming_record"; private String defaultMappingName; private static final String ES_INDEX_FIELD_OPT = "elasticsearch.output.index.field"; private static final String ES_INDEX_FIELD = "_index"; - private String indexFieldName; private static final String ES_MAPPING_FIELD_OPT = "elasticsearch.output.mapping.field"; private static final String ES_MAPPING_FIELD = "_mapping"; - private String mappingFieldName; private static final String ES_ID_FIELD_OPT = "elasticsearch.output.id.field"; private static final String ES_ID_FIELD = "_id"; - private String idFieldName; + + private static final String ES_ROUTING_FIELD_OPT = "elasticsearch.output.routing.field"; + private static final String ES_ROUTING_FIELD = "_routing"; private static final String ES_BULK_SIZE_OPT = "elasticsearch.output.bulk_size"; private static final String ES_BULK_SIZE = "1000"; - private int bulkSize; + private static final String ES_TRANSPORT_OPT = "elasticsearch.transport"; + private static final String ES_TRANSPORT = "false"; + private static final String ES_TRANSPORT_HOST_OPT = "elasticsearch.transport.host"; + private static final String ES_TRANSPORT_HOST = "localhost"; + + private static final String ES_TRANSPORT_PORT_OPT = "elasticsearch.transport.port"; + private static final String ES_TRANSPORT_PORT = "9300"; + // Elasticsearch internal settings required to make a client // connection. private static final String ES_CONFIG_OPT = "es.config"; private static final String ES_CONFIG = "/etc/elasticsearch/elasticsearch.yml"; @@ -65,11 +72,19 @@ String defaultIndexName = conf.get(ES_INDEX_OPT, ES_DEFAULT_INDEX); String defaultMappingName = conf.get(ES_MAPPING_OPT, ES_DEFAULT_MAPPING); String indexFieldName = conf.get(ES_INDEX_FIELD_OPT, ES_INDEX_FIELD); String mappingFieldName = conf.get(ES_MAPPING_FIELD_OPT, ES_MAPPING_FIELD); String idFieldName = conf.get(ES_ID_FIELD_OPT, ES_ID_FIELD); + String routingFieldName = conf.get(ES_ROUTING_FIELD_OPT, ES_ROUTING_FIELD); Integer bulkSize = Integer.parseInt(conf.get(ES_BULK_SIZE_OPT, ES_BULK_SIZE)); - return (RecordWriter) new ElasticSearchStreamingRecordWriter(defaultIndexName, defaultMappingName, indexFieldName, mappingFieldName, idFieldName, bulkSize); + boolean esTransport = new Boolean(conf.get(ES_TRANSPORT_OPT, ES_TRANSPORT)); + String esTransportHost = conf.get(ES_TRANSPORT_HOST_OPT, ES_TRANSPORT_HOST); + Integer esTransportPort = Integer.parseInt(conf.get(ES_TRANSPORT_PORT_OPT, ES_TRANSPORT_PORT)); + + return (RecordWriter) new ElasticSearchStreamingRecordWriter(defaultIndexName, defaultMappingName, + indexFieldName, mappingFieldName, idFieldName, routingFieldName, + bulkSize, + esTransport, esTransportHost, esTransportPort); } public void setLocalElasticSearchInstallation(JobConf conf) { String esConfigPath = conf.get(ES_CONFIG_OPT, ES_CONFIG); String esPluginsPath = conf.get(ES_PLUGINS_OPT, ES_PLUGINS);