lib/fluent/plugin/out_elasticsearch_dynamic.rb in fluent-plugin-elasticsearch-1.4.0 vs lib/fluent/plugin/out_elasticsearch_dynamic.rb in fluent-plugin-elasticsearch-1.5.0
- old
+ new
@@ -9,10 +9,11 @@
# params overloaded as strings
config_param :port, :string, :default => "9200"
config_param :logstash_format, :string, :default => "false"
config_param :utc_index, :string, :default => "true"
+ config_param :time_key_exclude_timestamp, :bool, :default => false
config_param :reload_connections, :string, :default => "true"
config_param :reload_on_failure, :string, :default => "false"
config_param :resurrect_after, :string, :default => "60"
config_param :ssl_verify, :string, :dfeault => "true"
@@ -129,11 +130,11 @@
if eval(dynamic_conf['logstash_format'])
if record.has_key?("@timestamp")
time = Time.parse record["@timestamp"]
elsif record.has_key?(dynamic_conf['time_key'])
time = Time.parse record[dynamic_conf['time_key']]
- record['@timestamp'] = record[dynamic_conf['time_key']]
+ record['@timestamp'] = record[dynamic_conf['time_key']] unless time_key_exclude_timestamp
else
record.merge!({"@timestamp" => Time.at(time).to_datetime.to_s})
end
if eval(dynamic_conf['utc_index'])
@@ -142,27 +143,35 @@
target_index = "#{dynamic_conf['logstash_prefix']}-#{Time.at(time).strftime("#{dynamic_conf['logstash_dateformat']}")}"
end
else
target_index = dynamic_conf['index_name']
end
-
+
+ # Change target_index to lower-case since Elasticsearch doesn't
+ # allow upper-case characters in index names.
+ target_index = target_index.downcase
+
if @include_tag_key
record.merge!(dynamic_conf['tag_key'] => tag)
end
meta = {"_index" => target_index, "_type" => dynamic_conf['type_name']}
- if dynamic_conf['id_key'] && record[dynamic_conf['id_key']]
- meta['_id'] = record[dynamic_conf['id_key']]
- end
- if dynamic_conf['parent_key'] && record[dynamic_conf['parent_key']]
- meta['_parent'] = record[dynamic_conf['parent_key']]
+ @meta_config_map ||= { 'id_key' => '_id', 'parent_key' => '_parent', 'routing_key' => '_routing' }
+ @meta_config_map.each_pair do |config_name, meta_key|
+ if dynamic_conf[config_name] && record[dynamic_conf[config_name]]
+ meta[meta_key] = record[dynamic_conf[config_name]]
+ end
end
if dynamic_conf['hosts']
host = dynamic_conf['hosts']
else
host = "#{dynamic_conf['host']}:#{dynamic_conf['port']}"
+ end
+
+ if @remove_keys
+ @remove_keys.each { |key| record.delete(key) }
end
append_record_to_messages(dynamic_conf["write_operation"], meta, record, bulk_message[host])
end