lib/fluent/plugin/out_elasticsearch_dynamic.rb in fluent-plugin-elasticsearch-2.0.0.rc.5 vs lib/fluent/plugin/out_elasticsearch_dynamic.rb in fluent-plugin-elasticsearch-2.0.0.rc.6
- old
+ new
@@ -4,10 +4,12 @@
module Fluent::Plugin
class ElasticsearchOutputDynamic < ElasticsearchOutput
Fluent::Plugin.register_output('elasticsearch_dynamic', self)
+ helpers :event_emitter
+
config_param :delimiter, :string, :default => "."
DYNAMIC_PARAM_NAMES = %W[hosts host port logstash_format logstash_prefix logstash_dateformat time_key utc_index index_name tag_key type_name id_key parent_key routing_key write_operation]
DYNAMIC_PARAM_SYMBOLS = DYNAMIC_PARAM_NAMES.map { |n| "@#{n}".to_sym }
@@ -79,11 +81,11 @@
port: (host_str.split(':')[1] || @port).to_i,
scheme: @scheme
}
else
# New hosts format expects URLs such as http://logs.foo.com,https://john:pass@logs2.foo.com/elastic
- uri = URI(host_str)
+ uri = URI(get_escaped_userinfo(host_str))
%w(user password path).inject(host: uri.host, port: uri.port, scheme: uri.scheme) do |hash, key|
hash[key.to_sym] = uri.public_send(key) unless uri.public_send(key).nil? || uri.public_send(key) == ''
hash
end
end
@@ -126,20 +128,26 @@
tag = chunk.metadata.tag
chunk.msgpack_each do |time, record|
next unless record.is_a? Hash
- # evaluate all configurations here
- DYNAMIC_PARAM_SYMBOLS.each_with_index { |var, i|
- k = DYNAMIC_PARAM_NAMES[i]
- v = self.instance_variable_get(var)
- # check here to determine if we should evaluate
- if dynamic_conf[k] != v
- value = expand_param(v, tag, time, record)
- dynamic_conf[k] = value
- end
- }
+ begin
+ # evaluate all configurations here
+ DYNAMIC_PARAM_SYMBOLS.each_with_index { |var, i|
+ k = DYNAMIC_PARAM_NAMES[i]
+ v = self.instance_variable_get(var)
+ # check here to determine if we should evaluate
+ if dynamic_conf[k] != v
+ value = expand_param(v, tag, time, record)
+ dynamic_conf[k] = value
+ end
+ }
# end eval all configs
+ rescue => e
+ # handle dynamic parameters misconfigurations
+ router.emit_error_event(tag, time, record, e)
+ next
+ end
if eval_or_val(dynamic_conf['logstash_format'])
if record.has_key?("@timestamp")
time = Time.parse record["@timestamp"]
elsif record.has_key?(dynamic_conf['time_key'])