lib/fluent/plugin/out_elasticsearch_dynamic.rb in fluent-plugin-elasticsearch-2.0.0.rc.5 vs lib/fluent/plugin/out_elasticsearch_dynamic.rb in fluent-plugin-elasticsearch-2.0.0.rc.6

- old
+ new

@@ -4,10 +4,12 @@ module Fluent::Plugin class ElasticsearchOutputDynamic < ElasticsearchOutput Fluent::Plugin.register_output('elasticsearch_dynamic', self) + helpers :event_emitter + config_param :delimiter, :string, :default => "." DYNAMIC_PARAM_NAMES = %W[hosts host port logstash_format logstash_prefix logstash_dateformat time_key utc_index index_name tag_key type_name id_key parent_key routing_key write_operation] DYNAMIC_PARAM_SYMBOLS = DYNAMIC_PARAM_NAMES.map { |n| "@#{n}".to_sym } @@ -79,11 +81,11 @@ port: (host_str.split(':')[1] || @port).to_i, scheme: @scheme } else # New hosts format expects URLs such as http://logs.foo.com,https://john:pass@logs2.foo.com/elastic - uri = URI(host_str) + uri = URI(get_escaped_userinfo(host_str)) %w(user password path).inject(host: uri.host, port: uri.port, scheme: uri.scheme) do |hash, key| hash[key.to_sym] = uri.public_send(key) unless uri.public_send(key).nil? || uri.public_send(key) == '' hash end end @@ -126,20 +128,26 @@ tag = chunk.metadata.tag chunk.msgpack_each do |time, record| next unless record.is_a? Hash - # evaluate all configurations here - DYNAMIC_PARAM_SYMBOLS.each_with_index { |var, i| - k = DYNAMIC_PARAM_NAMES[i] - v = self.instance_variable_get(var) - # check here to determine if we should evaluate - if dynamic_conf[k] != v - value = expand_param(v, tag, time, record) - dynamic_conf[k] = value - end - } + begin + # evaluate all configurations here + DYNAMIC_PARAM_SYMBOLS.each_with_index { |var, i| + k = DYNAMIC_PARAM_NAMES[i] + v = self.instance_variable_get(var) + # check here to determine if we should evaluate + if dynamic_conf[k] != v + value = expand_param(v, tag, time, record) + dynamic_conf[k] = value + end + } # end eval all configs + rescue => e + # handle dynamic parameters misconfigurations + router.emit_error_event(tag, time, record, e) + next + end if eval_or_val(dynamic_conf['logstash_format']) if record.has_key?("@timestamp") time = Time.parse record["@timestamp"] elsif record.has_key?(dynamic_conf['time_key'])