lib/fluent/plugin/out_elasticsearch_dynamic.rb in fluent-plugin-elasticsearch-1.18.2 vs lib/fluent/plugin/out_elasticsearch_dynamic.rb in fluent-plugin-elasticsearch-2.0.0.rc.1

- old
+ new

@@ -1,282 +1,268 @@ # encoding: UTF-8 require_relative 'out_elasticsearch' -class Fluent::ElasticsearchOutputDynamic < Fluent::ElasticsearchOutput +module Fluent::Plugin + class ElasticsearchOutputDynamic < ElasticsearchOutput - Fluent::Plugin.register_output('elasticsearch_dynamic', self) + Fluent::Plugin.register_output('elasticsearch_dynamic', self) - config_param :delimiter, :string, :default => "." + config_param :delimiter, :string, :default => "." - DYNAMIC_PARAM_NAMES = %W[hosts host port include_timestamp logstash_format logstash_prefix logstash_dateformat time_key utc_index index_name tag_key type_name id_key parent_key routing_key write_operation] - DYNAMIC_PARAM_SYMBOLS = DYNAMIC_PARAM_NAMES.map { |n| "@#{n}".to_sym } + DYNAMIC_PARAM_NAMES = %W[hosts host port logstash_format logstash_prefix logstash_dateformat time_key utc_index index_name tag_key type_name id_key parent_key routing_key write_operation] + DYNAMIC_PARAM_SYMBOLS = DYNAMIC_PARAM_NAMES.map { |n| "@#{n}".to_sym } - attr_reader :dynamic_config + attr_reader :dynamic_config - def configure(conf) - super + def configure(conf) + super - # evaluate all configurations here - @dynamic_config = {} - DYNAMIC_PARAM_SYMBOLS.each_with_index { |var, i| - value = expand_param(self.instance_variable_get(var), nil, nil, nil) - key = DYNAMIC_PARAM_NAMES[i] - @dynamic_config[key] = value.to_s - } - # end eval all configs - @current_config = nil - end + # evaluate all configurations here + @dynamic_config = {} + DYNAMIC_PARAM_SYMBOLS.each_with_index { |var, i| + value = expand_param(self.instance_variable_get(var), nil, nil, nil) + key = DYNAMIC_PARAM_NAMES[i] + @dynamic_config[key] = value.to_s + } + # end eval all configs + @current_config = nil + end - def create_meta_config_map - {'id_key' => '_id', 'parent_key' => '_parent', 'routing_key' => '_routing'} - end + def create_meta_config_map + {'id_key' => '_id', 'parent_key' => '_parent', 'routing_key' => '_routing'} + end - def client(host = nil) + def client(host) - # check here to see if we already have a client connection for the given host - connection_options = get_connection_options(host) + # check here to see if we already have a client connection for the given host + connection_options = get_connection_options(host) - @_es = nil unless is_existing_connection(connection_options[:hosts]) + @_es = nil unless is_existing_connection(connection_options[:hosts]) - @_es ||= begin - @current_config = connection_options[:hosts].clone - excon_options = { client_key: @client_key, client_cert: @client_cert, client_key_pass: @client_key_pass } - adapter_conf = lambda {|f| f.adapter :excon, excon_options } - transport = Elasticsearch::Transport::Transport::HTTP::Faraday.new(connection_options.merge( - options: { - reload_connections: @reload_connections, - reload_on_failure: @reload_on_failure, - resurrect_after: @resurrect_after, - retry_on_failure: 5, - logger: @transport_logger, - transport_options: { - headers: { 'Content-Type' => 'application/json' }, - request: { timeout: @request_timeout }, - ssl: { verify: @ssl_verify, ca_file: @ca_file } - }, - http: { - user: @user, - password: @password - } - }), &adapter_conf) - es = Elasticsearch::Client.new transport: transport + @_es ||= begin + @current_config = connection_options[:hosts].clone + excon_options = { client_key: @client_key, client_cert: @client_cert, client_key_pass: @client_key_pass } + adapter_conf = lambda {|f| f.adapter :excon, excon_options } + transport = Elasticsearch::Transport::Transport::HTTP::Faraday.new(connection_options.merge( + options: { + reload_connections: @reload_connections, + reload_on_failure: @reload_on_failure, + resurrect_after: @resurrect_after, + retry_on_failure: 5, + transport_options: { + headers: { 'Content-Type' => 'application/json' }, + request: { timeout: @request_timeout }, + ssl: { verify: @ssl_verify, ca_file: @ca_file } + } + }), &adapter_conf) + es = Elasticsearch::Client.new transport: transport - begin - raise ConnectionFailure, "Can not reach Elasticsearch cluster (#{connection_options_description(host)})!" unless es.ping - rescue *es.transport.host_unreachable_exceptions => e - raise ConnectionFailure, "Can not reach Elasticsearch cluster (#{connection_options_description(host)})! #{e.message}" - end + begin + raise ConnectionFailure, "Can not reach Elasticsearch cluster (#{connection_options_description(host)})!" unless es.ping + rescue *es.transport.host_unreachable_exceptions => e + raise ConnectionFailure, "Can not reach Elasticsearch cluster (#{connection_options_description(host)})! #{e.message}" + end - log.info "Connection opened to Elasticsearch cluster => #{connection_options_description(host)}" - es + log.info "Connection opened to Elasticsearch cluster => #{connection_options_description(host)}" + es + end end - end - def get_connection_options(con_host) - raise "`password` must be present if `user` is present" if @user && !@password + def get_connection_options(con_host) + raise "`password` must be present if `user` is present" if @user && !@password - hosts = if con_host || @hosts - (con_host || @hosts).split(',').map do |host_str| - # Support legacy hosts format host:port,host:port,host:port... - if host_str.match(%r{^[^:]+(\:\d+)?$}) - { - host: host_str.split(':')[0], - port: (host_str.split(':')[1] || @port).to_i, - scheme: @scheme.to_s - } - else - # New hosts format expects URLs such as http://logs.foo.com,https://john:pass@logs2.foo.com/elastic - uri = URI(get_escaped_userinfo(host_str)) - %w(user password path).inject(host: uri.host, port: uri.port, scheme: uri.scheme) do |hash, key| - hash[key.to_sym] = uri.public_send(key) unless uri.public_send(key).nil? || uri.public_send(key) == '' - hash + hosts = if con_host || @hosts + (con_host || @hosts).split(',').map do |host_str| + # Support legacy hosts format host:port,host:port,host:port... + if host_str.match(%r{^[^:]+(\:\d+)?$}) + { + host: host_str.split(':')[0], + port: (host_str.split(':')[1] || @port).to_i, + scheme: @scheme + } + else + # New hosts format expects URLs such as http://logs.foo.com,https://john:pass@logs2.foo.com/elastic + uri = URI(host_str) + %w(user password path).inject(host: uri.host, port: uri.port, scheme: uri.scheme) do |hash, key| + hash[key.to_sym] = uri.public_send(key) unless uri.public_send(key).nil? || uri.public_send(key) == '' + hash + end end - end - end.compact - else - [{host: @host, port: @port.to_i, scheme: @scheme.to_s}] - end.each do |host| - host.merge!(user: @user, password: @password) if !host[:user] && @user - host.merge!(path: @path) if !host[:path] && @path + end.compact + else + [{host: @host, port: @port.to_i, scheme: @scheme}] + end.each do |host| + host.merge!(user: @user, password: @password) if !host[:user] && @user + host.merge!(path: @path) if !host[:path] && @path + end + + { + hosts: hosts + } end - { - hosts: hosts - } - end + def connection_options_description(host) + get_connection_options(host)[:hosts].map do |host_info| + attributes = host_info.dup + attributes[:password] = 'obfuscated' if attributes.has_key?(:password) + attributes.inspect + end.join(', ') + end - def connection_options_description(host) - get_connection_options(host)[:hosts].map do |host_info| - attributes = host_info.dup - attributes[:password] = 'obfuscated' if attributes.has_key?(:password) - attributes.inspect - end.join(', ') - end + def write(chunk) + bulk_message = Hash.new { |h,k| h[k] = '' } + dynamic_conf = @dynamic_config.clone - def write_objects(tag, chunk) - bulk_message = Hash.new { |h,k| h[k] = '' } - dynamic_conf = @dynamic_config.clone + headers = { + UPDATE_OP => {}, + UPSERT_OP => {}, + CREATE_OP => {}, + INDEX_OP => {} + } - headers = { - UPDATE_OP => {}, - UPSERT_OP => {}, - CREATE_OP => {}, - INDEX_OP => {} - } + tag = chunk.metadata.tag - chunk.msgpack_each do |time, record| - next unless record.is_a? Hash + chunk.msgpack_each do |time, record| + next unless record.is_a? Hash - if @hash_config - record = generate_hash_id_key(record) - end - - begin # evaluate all configurations here DYNAMIC_PARAM_SYMBOLS.each_with_index { |var, i| k = DYNAMIC_PARAM_NAMES[i] v = self.instance_variable_get(var) # check here to determine if we should evaluate if dynamic_conf[k] != v value = expand_param(v, tag, time, record) dynamic_conf[k] = value end } - # end eval all configs - rescue => e - # handle dynamic parameters misconfigurations - router.emit_error_event(tag, time, record, e) - next - end + # end eval all configs - if eval_or_val(dynamic_conf['logstash_format']) || eval_or_val(dynamic_conf['include_timestamp']) - if record.has_key?("@timestamp") - time = Time.parse record["@timestamp"] - elsif record.has_key?(dynamic_conf['time_key']) - time = Time.parse record[dynamic_conf['time_key']] - record['@timestamp'] = record[dynamic_conf['time_key']] unless time_key_exclude_timestamp + if eval_or_val(dynamic_conf['logstash_format']) + if record.has_key?("@timestamp") + time = Time.parse record["@timestamp"] + elsif record.has_key?(dynamic_conf['time_key']) + time = Time.parse record[dynamic_conf['time_key']] + record['@timestamp'] = record[dynamic_conf['time_key']] unless time_key_exclude_timestamp + else + record.merge!({"@timestamp" => Time.at(time).to_datetime.to_s}) + end + + if eval_or_val(dynamic_conf['utc_index']) + target_index = "#{dynamic_conf['logstash_prefix']}-#{Time.at(time).getutc.strftime("#{dynamic_conf['logstash_dateformat']}")}" + else + target_index = "#{dynamic_conf['logstash_prefix']}-#{Time.at(time).strftime("#{dynamic_conf['logstash_dateformat']}")}" + end else - record.merge!({"@timestamp" => Time.at(time).to_datetime.to_s}) + target_index = dynamic_conf['index_name'] end - end - if eval_or_val(dynamic_conf['logstash_format']) - if eval_or_val(dynamic_conf['utc_index']) - target_index = "#{dynamic_conf['logstash_prefix']}-#{Time.at(time).getutc.strftime("#{dynamic_conf['logstash_dateformat']}")}" - else - target_index = "#{dynamic_conf['logstash_prefix']}-#{Time.at(time).strftime("#{dynamic_conf['logstash_dateformat']}")}" + # Change target_index to lower-case since Elasticsearch doesn't + # allow upper-case characters in index names. + target_index = target_index.downcase + + if @include_tag_key + record.merge!(dynamic_conf['tag_key'] => tag) end - else - target_index = dynamic_conf['index_name'] - end - # Change target_index to lower-case since Elasticsearch doesn't - # allow upper-case characters in index names. - target_index = target_index.downcase + meta = {"_index" => target_index, "_type" => dynamic_conf['type_name']} - if @include_tag_key - record.merge!(dynamic_conf['tag_key'] => tag) - end + @meta_config_map.each_pair do |config_name, meta_key| + if dynamic_conf[config_name] && record[dynamic_conf[config_name]] + meta[meta_key] = record[dynamic_conf[config_name]] + end + end - meta = {"_index" => target_index, "_type" => dynamic_conf['type_name']} + if dynamic_conf['hosts'] + host = dynamic_conf['hosts'] + else + host = "#{dynamic_conf['host']}:#{dynamic_conf['port']}" + end - @meta_config_map.each_pair do |config_name, meta_key| - if dynamic_conf[config_name] && record[dynamic_conf[config_name]] - meta[meta_key] = record[dynamic_conf[config_name]] + if @remove_keys + @remove_keys.each { |key| record.delete(key) } end - end - if dynamic_conf['hosts'] - host = dynamic_conf['hosts'] - else - host = "#{dynamic_conf['host']}:#{dynamic_conf['port']}" + write_op = dynamic_conf["write_operation"] + append_record_to_messages(write_op, meta, headers[write_op], record, bulk_message[host]) end - if @remove_keys - @remove_keys.each { |key| record.delete(key) } + bulk_message.each do |hKey, msgs| + send_bulk(msgs, hKey) unless msgs.empty? + msgs.clear end - - write_op = dynamic_conf["write_operation"] - append_record_to_messages(write_op, meta, headers[write_op], record, bulk_message[host]) end - bulk_message.each do |hKey, msgs| - send_bulk(msgs, hKey) unless msgs.empty? - msgs.clear + def send_bulk(data, host) + retries = 0 + begin + response = client(host).bulk body: data + if response['errors'] + log.error "Could not push log to Elasticsearch: #{response}" + end + rescue *client(host).transport.host_unreachable_exceptions => e + if retries < 2 + retries += 1 + @_es = nil + log.warn "Could not push logs to Elasticsearch, resetting connection and trying again. #{e.message}" + sleep 2**retries + retry + end + raise ConnectionFailure, "Could not push logs to Elasticsearch after #{retries} retries. #{e.message}" + rescue Exception + @_es = nil if @reconnect_on_error + raise + end end - end - def send_bulk(data, host) - retries = 0 - begin - response = client(host).bulk body: data - if response['errors'] - log.error "Could not push log to Elasticsearch: #{response}" - end - rescue *client(host).transport.host_unreachable_exceptions => e - if retries < 2 - retries += 1 - @_es = nil - log.warn "Could not push logs to Elasticsearch, resetting connection and trying again. #{e.message}" - sleep 2**retries - retry - end - raise ConnectionFailure, "Could not push logs to Elasticsearch after #{retries} retries. #{e.message}" - rescue Exception - @_es = nil if @reconnect_on_error - raise + def eval_or_val(var) + return var unless var.is_a?(String) + eval(var) end - end - def eval_or_val(var) - return var unless var.is_a?(String) - eval(var) - end + def expand_param(param, tag, time, record) + # check for '${ ... }' + # yes => `eval` + # no => return param + return param if (param =~ /\${.+}/).nil? - def expand_param(param, tag, time, record) - # check for '${ ... }' - # yes => `eval` - # no => return param - return param if (param =~ /\${.+}/).nil? + # check for 'tag_parts[]' + # separated by a delimiter (default '.') + tag_parts = tag.split(@delimiter) unless (param =~ /tag_parts\[.+\]/).nil? || tag.nil? - # check for 'tag_parts[]' - # separated by a delimiter (default '.') - tag_parts = tag.split(@delimiter) unless (param =~ /tag_parts\[.+\]/).nil? || tag.nil? + # pull out section between ${} then eval + inner = param.clone + while inner.match(/\${.+}/) + to_eval = inner.match(/\${(.+?)}/){$1} - # pull out section between ${} then eval - inner = param.clone - while inner.match(/\${.+}/) - to_eval = inner.match(/\${(.+?)}/){$1} - - if !(to_eval =~ /record\[.+\]/).nil? && record.nil? - return to_eval - elsif !(to_eval =~/tag_parts\[.+\]/).nil? && tag_parts.nil? - return to_eval - elsif !(to_eval =~/time/).nil? && time.nil? - return to_eval - else - inner.sub!(/\${.+?}/, eval( to_eval )) + if !(to_eval =~ /record\[.+\]/).nil? && record.nil? + return to_eval + elsif !(to_eval =~/tag_parts\[.+\]/).nil? && tag_parts.nil? + return to_eval + elsif !(to_eval =~/time/).nil? && time.nil? + return to_eval + else + inner.sub!(/\${.+?}/, eval( to_eval )) + end end + inner end - inner - end - def is_valid_expand_param_type(param) - return false if [:@buffer_type].include?(param) - return self.instance_variable_get(param).is_a?(String) - end + def is_valid_expand_param_type(param) + return false if [:@buffer_type].include?(param) + return self.instance_variable_get(param).is_a?(String) + end - def is_existing_connection(host) - # check if the host provided match the current connection - return false if @_es.nil? - return false if @current_config.nil? - return false if host.length != @current_config.length + def is_existing_connection(host) + # check if the host provided match the current connection + return false if @_es.nil? + return false if host.length != @current_config.length - for i in 0...host.length - if !host[i][:host].eql? @current_config[i][:host] || host[i][:port] != @current_config[i][:port] - return false + for i in 0...host.length + if !host[i][:host].eql? @current_config[i][:host] || host[i][:port] != @current_config[i][:port] + return false + end end - end - return true + return true + end end end