lib/fluent/plugin/out_elasticsearch_dynamic.rb in fluent-plugin-elasticsearch-1.4.0 vs lib/fluent/plugin/out_elasticsearch_dynamic.rb in fluent-plugin-elasticsearch-1.5.0

- old
+ new

@@ -9,10 +9,11 @@ # params overloaded as strings config_param :port, :string, :default => "9200" config_param :logstash_format, :string, :default => "false" config_param :utc_index, :string, :default => "true" + config_param :time_key_exclude_timestamp, :bool, :default => false config_param :reload_connections, :string, :default => "true" config_param :reload_on_failure, :string, :default => "false" config_param :resurrect_after, :string, :default => "60" config_param :ssl_verify, :string, :dfeault => "true" @@ -129,11 +130,11 @@ if eval(dynamic_conf['logstash_format']) if record.has_key?("@timestamp") time = Time.parse record["@timestamp"] elsif record.has_key?(dynamic_conf['time_key']) time = Time.parse record[dynamic_conf['time_key']] - record['@timestamp'] = record[dynamic_conf['time_key']] + record['@timestamp'] = record[dynamic_conf['time_key']] unless time_key_exclude_timestamp else record.merge!({"@timestamp" => Time.at(time).to_datetime.to_s}) end if eval(dynamic_conf['utc_index']) @@ -142,27 +143,35 @@ target_index = "#{dynamic_conf['logstash_prefix']}-#{Time.at(time).strftime("#{dynamic_conf['logstash_dateformat']}")}" end else target_index = dynamic_conf['index_name'] end - + + # Change target_index to lower-case since Elasticsearch doesn't + # allow upper-case characters in index names. + target_index = target_index.downcase + if @include_tag_key record.merge!(dynamic_conf['tag_key'] => tag) end meta = {"_index" => target_index, "_type" => dynamic_conf['type_name']} - if dynamic_conf['id_key'] && record[dynamic_conf['id_key']] - meta['_id'] = record[dynamic_conf['id_key']] - end - if dynamic_conf['parent_key'] && record[dynamic_conf['parent_key']] - meta['_parent'] = record[dynamic_conf['parent_key']] + @meta_config_map ||= { 'id_key' => '_id', 'parent_key' => '_parent', 'routing_key' => '_routing' } + @meta_config_map.each_pair do |config_name, meta_key| + if dynamic_conf[config_name] && record[dynamic_conf[config_name]] + meta[meta_key] = record[dynamic_conf[config_name]] + end end if dynamic_conf['hosts'] host = dynamic_conf['hosts'] else host = "#{dynamic_conf['host']}:#{dynamic_conf['port']}" + end + + if @remove_keys + @remove_keys.each { |key| record.delete(key) } end append_record_to_messages(dynamic_conf["write_operation"], meta, record, bulk_message[host]) end