lib/fluent/plugin/out_elasticsearch_dynamic.rb in fluent-plugin-elasticsearch-1.10.0 vs lib/fluent/plugin/out_elasticsearch_dynamic.rb in fluent-plugin-elasticsearch-1.10.1
- old
+ new
@@ -78,11 +78,11 @@
port: (host_str.split(':')[1] || @port).to_i,
scheme: @scheme
}
else
# New hosts format expects URLs such as http://logs.foo.com,https://john:pass@logs2.foo.com/elastic
- uri = URI(host_str)
+ uri = URI(get_escaped_userinfo(host_str))
%w(user password path).inject(host: uri.host, port: uri.port, scheme: uri.scheme) do |hash, key|
hash[key.to_sym] = uri.public_send(key) unless uri.public_send(key).nil? || uri.public_send(key) == ''
hash
end
end
@@ -119,20 +119,26 @@
}
chunk.msgpack_each do |time, record|
next unless record.is_a? Hash
- # evaluate all configurations here
- DYNAMIC_PARAM_SYMBOLS.each_with_index { |var, i|
- k = DYNAMIC_PARAM_NAMES[i]
- v = self.instance_variable_get(var)
- # check here to determine if we should evaluate
- if dynamic_conf[k] != v
- value = expand_param(v, tag, time, record)
- dynamic_conf[k] = value
- end
- }
+ begin
+ # evaluate all configurations here
+ DYNAMIC_PARAM_SYMBOLS.each_with_index { |var, i|
+ k = DYNAMIC_PARAM_NAMES[i]
+ v = self.instance_variable_get(var)
+ # check here to determine if we should evaluate
+ if dynamic_conf[k] != v
+ value = expand_param(v, tag, time, record)
+ dynamic_conf[k] = value
+ end
+ }
# end eval all configs
+ rescue => e
+ # handle dynamic parameters misconfigurations
+ router.emit_error_event(tag, time, record, e)
+ next
+ end
if eval_or_val(dynamic_conf['logstash_format'])
if record.has_key?("@timestamp")
time = Time.parse record["@timestamp"]
elsif record.has_key?(dynamic_conf['time_key'])