lib/fluent/plugin/out_elasticsearch_dynamic.rb in fluent-plugin-elasticsearch-3.5.4 vs lib/fluent/plugin/out_elasticsearch_dynamic.rb in fluent-plugin-elasticsearch-3.5.5
- old
+ new
@@ -1,262 +1,262 @@
-# encoding: UTF-8
-require_relative 'out_elasticsearch'
-
-module Fluent::Plugin
- class ElasticsearchOutputDynamic < ElasticsearchOutput
-
- Fluent::Plugin.register_output('elasticsearch_dynamic', self)
-
- helpers :event_emitter
-
- config_param :delimiter, :string, :default => "."
-
- DYNAMIC_PARAM_NAMES = %W[hosts host port include_timestamp logstash_format logstash_prefix logstash_dateformat time_key utc_index index_name tag_key type_name id_key parent_key routing_key write_operation]
- DYNAMIC_PARAM_SYMBOLS = DYNAMIC_PARAM_NAMES.map { |n| "@#{n}".to_sym }
-
- RequestInfo = Struct.new(:host, :index)
-
- attr_reader :dynamic_config
-
- def configure(conf)
- super
-
- # evaluate all configurations here
- @dynamic_config = {}
- DYNAMIC_PARAM_SYMBOLS.each_with_index { |var, i|
- value = expand_param(self.instance_variable_get(var), nil, nil, nil)
- key = DYNAMIC_PARAM_NAMES[i]
- @dynamic_config[key] = value.to_s
- }
- # end eval all configs
- end
-
- def create_meta_config_map
- {'id_key' => '_id', 'parent_key' => '_parent', 'routing_key' => @routing_key_name}
- end
-
-
- def client(host = nil)
- # check here to see if we already have a client connection for the given host
- connection_options = get_connection_options(host)
-
- @_es = nil unless is_existing_connection(connection_options[:hosts])
-
- @_es ||= begin
- @current_config = connection_options[:hosts].clone
- adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options }
- transport = Elasticsearch::Transport::Transport::HTTP::Faraday.new(connection_options.merge(
- options: {
- reload_connections: @reload_connections,
- reload_on_failure: @reload_on_failure,
- resurrect_after: @resurrect_after,
- logger: @transport_logger,
- transport_options: {
- headers: { 'Content-Type' => @content_type.to_s },
- request: { timeout: @request_timeout },
- ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version }
- },
- http: {
- user: @user,
- password: @password
- }
- }), &adapter_conf)
- Elasticsearch::Client.new transport: transport
- end
- end
-
- def get_connection_options(con_host)
- raise "`password` must be present if `user` is present" if @user && !@password
-
- hosts = if con_host || @hosts
- (con_host || @hosts).split(',').map do |host_str|
- # Support legacy hosts format host:port,host:port,host:port...
- if host_str.match(%r{^[^:]+(\:\d+)?$})
- {
- host: host_str.split(':')[0],
- port: (host_str.split(':')[1] || @port).to_i,
- scheme: @scheme.to_s
- }
- else
- # New hosts format expects URLs such as http://logs.foo.com,https://john:pass@logs2.foo.com/elastic
- uri = URI(get_escaped_userinfo(host_str))
- %w(user password path).inject(host: uri.host, port: uri.port, scheme: uri.scheme) do |hash, key|
- hash[key.to_sym] = uri.public_send(key) unless uri.public_send(key).nil? || uri.public_send(key) == ''
- hash
- end
- end
- end.compact
- else
- [{host: @host, port: @port.to_i, scheme: @scheme.to_s}]
- end.each do |host|
- host.merge!(user: @user, password: @password) if !host[:user] && @user
- host.merge!(path: @path) if !host[:path] && @path
- end
-
- {
- hosts: hosts
- }
- end
-
- def connection_options_description(host)
- get_connection_options(host)[:hosts].map do |host_info|
- attributes = host_info.dup
- attributes[:password] = 'obfuscated' if attributes.has_key?(:password)
- attributes.inspect
- end.join(', ')
- end
-
- def multi_workers_ready?
- true
- end
-
- def write(chunk)
- bulk_message = Hash.new { |h,k| h[k] = '' }
- dynamic_conf = @dynamic_config.clone
-
- headers = {
- UPDATE_OP => {},
- UPSERT_OP => {},
- CREATE_OP => {},
- INDEX_OP => {}
- }
-
- tag = chunk.metadata.tag
-
- chunk.msgpack_each do |time, record|
- next unless record.is_a? Hash
-
- begin
- # evaluate all configurations here
- DYNAMIC_PARAM_SYMBOLS.each_with_index { |var, i|
- k = DYNAMIC_PARAM_NAMES[i]
- v = self.instance_variable_get(var)
- # check here to determine if we should evaluate
- if dynamic_conf[k] != v
- value = expand_param(v, tag, time, record)
- dynamic_conf[k] = value
- end
- }
- # end eval all configs
- rescue => e
- # handle dynamic parameters misconfigurations
- router.emit_error_event(tag, time, record, e)
- next
- end
-
- if eval_or_val(dynamic_conf['logstash_format']) || eval_or_val(dynamic_conf['include_timestamp'])
- if record.has_key?("@timestamp")
- time = Time.parse record["@timestamp"]
- elsif record.has_key?(dynamic_conf['time_key'])
- time = Time.parse record[dynamic_conf['time_key']]
- record['@timestamp'] = record[dynamic_conf['time_key']] unless time_key_exclude_timestamp
- else
- record.merge!({"@timestamp" => Time.at(time).iso8601(@time_precision)})
- end
- end
-
- if eval_or_val(dynamic_conf['logstash_format'])
- if eval_or_val(dynamic_conf['utc_index'])
- target_index = "#{dynamic_conf['logstash_prefix']}#{@logstash_prefix_separator}#{Time.at(time).getutc.strftime("#{dynamic_conf['logstash_dateformat']}")}"
- else
- target_index = "#{dynamic_conf['logstash_prefix']}#{@logstash_prefix_separator}#{Time.at(time).strftime("#{dynamic_conf['logstash_dateformat']}")}"
- end
- else
- target_index = dynamic_conf['index_name']
- end
-
- # Change target_index to lower-case since Elasticsearch doesn't
- # allow upper-case characters in index names.
- target_index = target_index.downcase
-
- if @include_tag_key
- record.merge!(dynamic_conf['tag_key'] => tag)
- end
-
- if dynamic_conf['hosts']
- host = dynamic_conf['hosts']
- else
- host = "#{dynamic_conf['host']}:#{dynamic_conf['port']}"
- end
-
- if @include_index_in_url
- key = RequestInfo.new(host, target_index)
- meta = {"_type" => dynamic_conf['type_name']}
- else
- key = RequestInfo.new(host, nil)
- meta = {"_index" => target_index, "_type" => dynamic_conf['type_name']}
- end
-
- @meta_config_map.each_pair do |config_name, meta_key|
- if dynamic_conf[config_name] && accessor = record_accessor_create(dynamic_conf[config_name])
- if raw_value = accessor.call(record)
- meta[meta_key] = raw_value
- end
- end
- end
-
- if @remove_keys
- @remove_keys.each { |key| record.delete(key) }
- end
-
- write_op = dynamic_conf["write_operation"]
- append_record_to_messages(write_op, meta, headers[write_op], record, bulk_message[key])
- end
-
- bulk_message.each do |info, msgs|
- send_bulk(msgs, info.host, info.index) unless msgs.empty?
- msgs.clear
- end
- end
-
- def send_bulk(data, host, index)
- begin
- response = client(host).bulk body: data, index: index
- if response['errors']
- log.error "Could not push log to Elasticsearch: #{response}"
- end
- rescue => e
- @_es = nil if @reconnect_on_error
- # FIXME: identify unrecoverable errors and raise UnrecoverableRequestFailure instead
- raise RecoverableRequestFailure, "could not push logs to Elasticsearch cluster (#{connection_options_description(host)}): #{e.message}"
- end
- end
-
- def eval_or_val(var)
- return var unless var.is_a?(String)
- eval(var)
- end
-
- def expand_param(param, tag, time, record)
- # check for '${ ... }'
- # yes => `eval`
- # no => return param
- return param if (param =~ /\${.+}/).nil?
-
- # check for 'tag_parts[]'
- # separated by a delimiter (default '.')
- tag_parts = tag.split(@delimiter) unless (param =~ /tag_parts\[.+\]/).nil? || tag.nil?
-
- # pull out section between ${} then eval
- inner = param.clone
- while inner.match(/\${.+}/)
- to_eval = inner.match(/\${(.+?)}/){$1}
-
- if !(to_eval =~ /record\[.+\]/).nil? && record.nil?
- return to_eval
- elsif !(to_eval =~/tag_parts\[.+\]/).nil? && tag_parts.nil?
- return to_eval
- elsif !(to_eval =~/time/).nil? && time.nil?
- return to_eval
- else
- inner.sub!(/\${.+?}/, eval( to_eval ))
- end
- end
- inner
- end
-
- def is_valid_expand_param_type(param)
- return false if [:@buffer_type].include?(param)
- return self.instance_variable_get(param).is_a?(String)
- end
- end
-end
+# encoding: UTF-8
+require_relative 'out_elasticsearch'
+
+module Fluent::Plugin
+ class ElasticsearchOutputDynamic < ElasticsearchOutput
+
+ Fluent::Plugin.register_output('elasticsearch_dynamic', self)
+
+ helpers :event_emitter
+
+ config_param :delimiter, :string, :default => "."
+
+ DYNAMIC_PARAM_NAMES = %W[hosts host port include_timestamp logstash_format logstash_prefix logstash_dateformat time_key utc_index index_name tag_key type_name id_key parent_key routing_key write_operation]
+ DYNAMIC_PARAM_SYMBOLS = DYNAMIC_PARAM_NAMES.map { |n| "@#{n}".to_sym }
+
+ RequestInfo = Struct.new(:host, :index)
+
+ attr_reader :dynamic_config
+
+ def configure(conf)
+ super
+
+ # evaluate all configurations here
+ @dynamic_config = {}
+ DYNAMIC_PARAM_SYMBOLS.each_with_index { |var, i|
+ value = expand_param(self.instance_variable_get(var), nil, nil, nil)
+ key = DYNAMIC_PARAM_NAMES[i]
+ @dynamic_config[key] = value.to_s
+ }
+ # end eval all configs
+ end
+
+ def create_meta_config_map
+ {'id_key' => '_id', 'parent_key' => '_parent', 'routing_key' => @routing_key_name}
+ end
+
+
+ def client(host = nil)
+ # check here to see if we already have a client connection for the given host
+ connection_options = get_connection_options(host)
+
+ @_es = nil unless is_existing_connection(connection_options[:hosts])
+
+ @_es ||= begin
+ @current_config = connection_options[:hosts].clone
+ adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options }
+ transport = Elasticsearch::Transport::Transport::HTTP::Faraday.new(connection_options.merge(
+ options: {
+ reload_connections: @reload_connections,
+ reload_on_failure: @reload_on_failure,
+ resurrect_after: @resurrect_after,
+ logger: @transport_logger,
+ transport_options: {
+ headers: { 'Content-Type' => @content_type.to_s },
+ request: { timeout: @request_timeout },
+ ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version }
+ },
+ http: {
+ user: @user,
+ password: @password
+ }
+ }), &adapter_conf)
+ Elasticsearch::Client.new transport: transport
+ end
+ end
+
+ def get_connection_options(con_host)
+ raise "`password` must be present if `user` is present" if @user && !@password
+
+ hosts = if con_host || @hosts
+ (con_host || @hosts).split(',').map do |host_str|
+ # Support legacy hosts format host:port,host:port,host:port...
+ if host_str.match(%r{^[^:]+(\:\d+)?$})
+ {
+ host: host_str.split(':')[0],
+ port: (host_str.split(':')[1] || @port).to_i,
+ scheme: @scheme.to_s
+ }
+ else
+ # New hosts format expects URLs such as http://logs.foo.com,https://john:pass@logs2.foo.com/elastic
+ uri = URI(get_escaped_userinfo(host_str))
+ %w(user password path).inject(host: uri.host, port: uri.port, scheme: uri.scheme) do |hash, key|
+ hash[key.to_sym] = uri.public_send(key) unless uri.public_send(key).nil? || uri.public_send(key) == ''
+ hash
+ end
+ end
+ end.compact
+ else
+ [{host: @host, port: @port.to_i, scheme: @scheme.to_s}]
+ end.each do |host|
+ host.merge!(user: @user, password: @password) if !host[:user] && @user
+ host.merge!(path: @path) if !host[:path] && @path
+ end
+
+ {
+ hosts: hosts
+ }
+ end
+
+ def connection_options_description(host)
+ get_connection_options(host)[:hosts].map do |host_info|
+ attributes = host_info.dup
+ attributes[:password] = 'obfuscated' if attributes.has_key?(:password)
+ attributes.inspect
+ end.join(', ')
+ end
+
+ def multi_workers_ready?
+ true
+ end
+
+ def write(chunk)
+ bulk_message = Hash.new { |h,k| h[k] = '' }
+ dynamic_conf = @dynamic_config.clone
+
+ headers = {
+ UPDATE_OP => {},
+ UPSERT_OP => {},
+ CREATE_OP => {},
+ INDEX_OP => {}
+ }
+
+ tag = chunk.metadata.tag
+
+ chunk.msgpack_each do |time, record|
+ next unless record.is_a? Hash
+
+ begin
+ # evaluate all configurations here
+ DYNAMIC_PARAM_SYMBOLS.each_with_index { |var, i|
+ k = DYNAMIC_PARAM_NAMES[i]
+ v = self.instance_variable_get(var)
+ # check here to determine if we should evaluate
+ if dynamic_conf[k] != v
+ value = expand_param(v, tag, time, record)
+ dynamic_conf[k] = value
+ end
+ }
+ # end eval all configs
+ rescue => e
+ # handle dynamic parameters misconfigurations
+ router.emit_error_event(tag, time, record, e)
+ next
+ end
+
+ if eval_or_val(dynamic_conf['logstash_format']) || eval_or_val(dynamic_conf['include_timestamp'])
+ if record.has_key?("@timestamp")
+ time = Time.parse record["@timestamp"]
+ elsif record.has_key?(dynamic_conf['time_key'])
+ time = Time.parse record[dynamic_conf['time_key']]
+ record['@timestamp'] = record[dynamic_conf['time_key']] unless time_key_exclude_timestamp
+ else
+ record.merge!({"@timestamp" => Time.at(time).iso8601(@time_precision)})
+ end
+ end
+
+ if eval_or_val(dynamic_conf['logstash_format'])
+ if eval_or_val(dynamic_conf['utc_index'])
+ target_index = "#{dynamic_conf['logstash_prefix']}#{@logstash_prefix_separator}#{Time.at(time).getutc.strftime("#{dynamic_conf['logstash_dateformat']}")}"
+ else
+ target_index = "#{dynamic_conf['logstash_prefix']}#{@logstash_prefix_separator}#{Time.at(time).strftime("#{dynamic_conf['logstash_dateformat']}")}"
+ end
+ else
+ target_index = dynamic_conf['index_name']
+ end
+
+ # Change target_index to lower-case since Elasticsearch doesn't
+ # allow upper-case characters in index names.
+ target_index = target_index.downcase
+
+ if @include_tag_key
+ record.merge!(dynamic_conf['tag_key'] => tag)
+ end
+
+ if dynamic_conf['hosts']
+ host = dynamic_conf['hosts']
+ else
+ host = "#{dynamic_conf['host']}:#{dynamic_conf['port']}"
+ end
+
+ if @include_index_in_url
+ key = RequestInfo.new(host, target_index)
+ meta = {"_type" => dynamic_conf['type_name']}
+ else
+ key = RequestInfo.new(host, nil)
+ meta = {"_index" => target_index, "_type" => dynamic_conf['type_name']}
+ end
+
+ @meta_config_map.each_pair do |config_name, meta_key|
+ if dynamic_conf[config_name] && accessor = record_accessor_create(dynamic_conf[config_name])
+ if raw_value = accessor.call(record)
+ meta[meta_key] = raw_value
+ end
+ end
+ end
+
+ if @remove_keys
+ @remove_keys.each { |key| record.delete(key) }
+ end
+
+ write_op = dynamic_conf["write_operation"]
+ append_record_to_messages(write_op, meta, headers[write_op], record, bulk_message[key])
+ end
+
+ bulk_message.each do |info, msgs|
+ send_bulk(msgs, info.host, info.index) unless msgs.empty?
+ msgs.clear
+ end
+ end
+
+ def send_bulk(data, host, index)
+ begin
+ response = client(host).bulk body: data, index: index
+ if response['errors']
+ log.error "Could not push log to Elasticsearch: #{response}"
+ end
+ rescue => e
+ @_es = nil if @reconnect_on_error
+ # FIXME: identify unrecoverable errors and raise UnrecoverableRequestFailure instead
+ raise RecoverableRequestFailure, "could not push logs to Elasticsearch cluster (#{connection_options_description(host)}): #{e.message}"
+ end
+ end
+
+ def eval_or_val(var)
+ return var unless var.is_a?(String)
+ eval(var)
+ end
+
+ def expand_param(param, tag, time, record)
+ # check for '${ ... }'
+ # yes => `eval`
+ # no => return param
+ return param if (param =~ /\${.+}/).nil?
+
+ # check for 'tag_parts[]'
+ # separated by a delimiter (default '.')
+ tag_parts = tag.split(@delimiter) unless (param =~ /tag_parts\[.+\]/).nil? || tag.nil?
+
+ # pull out section between ${} then eval
+ inner = param.clone
+ while inner.match(/\${.+}/)
+ to_eval = inner.match(/\${(.+?)}/){$1}
+
+ if !(to_eval =~ /record\[.+\]/).nil? && record.nil?
+ return to_eval
+ elsif !(to_eval =~/tag_parts\[.+\]/).nil? && tag_parts.nil?
+ return to_eval
+ elsif !(to_eval =~/time/).nil? && time.nil?
+ return to_eval
+ else
+ inner.sub!(/\${.+?}/, eval( to_eval ))
+ end
+ end
+ inner
+ end
+
+ def is_valid_expand_param_type(param)
+ return false if [:@buffer_type].include?(param)
+ return self.instance_variable_get(param).is_a?(String)
+ end
+ end
+end