lib/fluent/plugin/out_elasticsearch_dynamic.rb in fluent-plugin-elasticsearch-1.10.0 vs lib/fluent/plugin/out_elasticsearch_dynamic.rb in fluent-plugin-elasticsearch-1.10.1

- old
+ new

@@ -78,11 +78,11 @@ port: (host_str.split(':')[1] || @port).to_i, scheme: @scheme } else # New hosts format expects URLs such as http://logs.foo.com,https://john:pass@logs2.foo.com/elastic - uri = URI(host_str) + uri = URI(get_escaped_userinfo(host_str)) %w(user password path).inject(host: uri.host, port: uri.port, scheme: uri.scheme) do |hash, key| hash[key.to_sym] = uri.public_send(key) unless uri.public_send(key).nil? || uri.public_send(key) == '' hash end end @@ -119,20 +119,26 @@ } chunk.msgpack_each do |time, record| next unless record.is_a? Hash - # evaluate all configurations here - DYNAMIC_PARAM_SYMBOLS.each_with_index { |var, i| - k = DYNAMIC_PARAM_NAMES[i] - v = self.instance_variable_get(var) - # check here to determine if we should evaluate - if dynamic_conf[k] != v - value = expand_param(v, tag, time, record) - dynamic_conf[k] = value - end - } + begin + # evaluate all configurations here + DYNAMIC_PARAM_SYMBOLS.each_with_index { |var, i| + k = DYNAMIC_PARAM_NAMES[i] + v = self.instance_variable_get(var) + # check here to determine if we should evaluate + if dynamic_conf[k] != v + value = expand_param(v, tag, time, record) + dynamic_conf[k] = value + end + } # end eval all configs + rescue => e + # handle dynamic parameters misconfigurations + router.emit_error_event(tag, time, record, e) + next + end if eval_or_val(dynamic_conf['logstash_format']) if record.has_key?("@timestamp") time = Time.parse record["@timestamp"] elsif record.has_key?(dynamic_conf['time_key'])