src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingOutputFormat.java in wonderdog-0.1.1 vs src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingOutputFormat.java in wonderdog-0.2.0
- old
+ new
@@ -35,25 +35,32 @@
private static final String ES_DEFAULT_MAPPING = "streaming_record";
private String defaultMappingName;
private static final String ES_INDEX_FIELD_OPT = "elasticsearch.output.index.field";
private static final String ES_INDEX_FIELD = "_index";
- private String indexFieldName;
private static final String ES_MAPPING_FIELD_OPT = "elasticsearch.output.mapping.field";
private static final String ES_MAPPING_FIELD = "_mapping";
- private String mappingFieldName;
private static final String ES_ID_FIELD_OPT = "elasticsearch.output.id.field";
private static final String ES_ID_FIELD = "_id";
- private String idFieldName;
+
+ private static final String ES_ROUTING_FIELD_OPT = "elasticsearch.output.routing.field";
+ private static final String ES_ROUTING_FIELD = "_routing";
private static final String ES_BULK_SIZE_OPT = "elasticsearch.output.bulk_size";
private static final String ES_BULK_SIZE = "1000";
- private int bulkSize;
+ private static final String ES_TRANSPORT_OPT = "elasticsearch.transport";
+ private static final String ES_TRANSPORT = "false";
+ private static final String ES_TRANSPORT_HOST_OPT = "elasticsearch.transport.host";
+ private static final String ES_TRANSPORT_HOST = "localhost";
+
+ private static final String ES_TRANSPORT_PORT_OPT = "elasticsearch.transport.port";
+ private static final String ES_TRANSPORT_PORT = "9300";
+
// Elasticsearch internal settings required to make a client
// connection.
private static final String ES_CONFIG_OPT = "es.config";
private static final String ES_CONFIG = "/etc/elasticsearch/elasticsearch.yml";
@@ -65,11 +72,19 @@
String defaultIndexName = conf.get(ES_INDEX_OPT, ES_DEFAULT_INDEX);
String defaultMappingName = conf.get(ES_MAPPING_OPT, ES_DEFAULT_MAPPING);
String indexFieldName = conf.get(ES_INDEX_FIELD_OPT, ES_INDEX_FIELD);
String mappingFieldName = conf.get(ES_MAPPING_FIELD_OPT, ES_MAPPING_FIELD);
String idFieldName = conf.get(ES_ID_FIELD_OPT, ES_ID_FIELD);
+ String routingFieldName = conf.get(ES_ROUTING_FIELD_OPT, ES_ROUTING_FIELD);
Integer bulkSize = Integer.parseInt(conf.get(ES_BULK_SIZE_OPT, ES_BULK_SIZE));
- return (RecordWriter) new ElasticSearchStreamingRecordWriter(defaultIndexName, defaultMappingName, indexFieldName, mappingFieldName, idFieldName, bulkSize);
+ boolean esTransport = new Boolean(conf.get(ES_TRANSPORT_OPT, ES_TRANSPORT));
+ String esTransportHost = conf.get(ES_TRANSPORT_HOST_OPT, ES_TRANSPORT_HOST);
+ Integer esTransportPort = Integer.parseInt(conf.get(ES_TRANSPORT_PORT_OPT, ES_TRANSPORT_PORT));
+
+ return (RecordWriter) new ElasticSearchStreamingRecordWriter(defaultIndexName, defaultMappingName,
+ indexFieldName, mappingFieldName, idFieldName, routingFieldName,
+ bulkSize,
+ esTransport, esTransportHost, esTransportPort);
}
public void setLocalElasticSearchInstallation(JobConf conf) {
String esConfigPath = conf.get(ES_CONFIG_OPT, ES_CONFIG);
String esPluginsPath = conf.get(ES_PLUGINS_OPT, ES_PLUGINS);