# # Fluentd Kubernetes Metadata Filter Plugin - Enrich Fluentd events with # Kubernetes metadata # # Copyright 2015 Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # module Fluent class KubernetesMetadataFilter < Fluent::Filter K8_POD_CA_CERT = 'ca.crt' K8_POD_TOKEN = 'token' Fluent::Plugin.register_filter('kubernetes_metadata', self) config_param :kubernetes_url, :string, default: nil config_param :cache_size, :integer, default: 1000 config_param :cache_ttl, :integer, default: 60 * 60 config_param :watch, :bool, default: true config_param :apiVersion, :string, default: 'v1' config_param :client_cert, :string, default: nil config_param :client_key, :string, default: nil config_param :ca_file, :string, default: nil config_param :verify_ssl, :bool, default: true config_param :tag_to_kubernetes_name_regexp, :string, :default => 'var\.log\.containers\.(?<pod_name>[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?<namespace>[^_]+)_(?<container_name>.+)-(?<docker_id>[a-z0-9]{64})\.log$' config_param :bearer_token_file, :string, default: nil config_param :merge_json_log, :bool, default: true config_param :preserve_json_log, :bool, default: true config_param :include_namespace_id, :bool, default: false config_param :include_namespace_metadata, :bool, default: false config_param :secret_dir, :string, default: '/var/run/secrets/kubernetes.io/serviceaccount' config_param :de_dot, :bool, default: true config_param :de_dot_separator, :string, default: '_' # if reading from the journal, the record will contain the following fields in the following # format: # CONTAINER_NAME=k8s_$containername.$containerhash_$podname_$namespacename_$poduuid_$rand32bitashex # CONTAINER_FULL_ID=dockeridassha256hexvalue config_param :use_journal, :bool, default: false # Field 2 is the container_hash, field 5 is the pod_id, and field 6 is the pod_randhex # I would have included them as named groups, but you can't have named groups that are # non-capturing :P # parse format is defined here: https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/dockertools/docker.go#L317 config_param :container_name_to_kubernetes_regexp, :string, :default => '^(?<name_prefix>[^_]+)_(?<container_name>[^\._]+)(\.(?<container_hash>[^_]+))?_(?<pod_name>[^_]+)_(?<namespace>[^_]+)_[^_]+_[^_]+$' config_param :annotation_match, :array, default: [] def syms_to_strs(hsh) newhsh = {} hsh.each_pair do |kk,vv| if vv.is_a?(Hash) vv = syms_to_strs(vv) end if kk.is_a?(Symbol) newhsh[kk.to_s] = vv else newhsh[kk] = vv end end newhsh end def parse_pod_metadata(pod_object) labels = syms_to_strs(pod_object['metadata']['labels'].to_h) annotations = match_annotations(syms_to_strs(pod_object['metadata']['annotations'].to_h)) if @de_dot self.de_dot!(labels) self.de_dot!(annotations) end kubernetes_metadata = { 'namespace_name' => pod_object['metadata']['namespace'], 'pod_id' => pod_object['metadata']['uid'], 'pod_name' => pod_object['metadata']['name'], 'labels' => labels, 'host' => pod_object['spec']['nodeName'], 'master_url' => @kubernetes_url } kubernetes_metadata['annotations'] = annotations unless annotations.empty? return kubernetes_metadata end def parse_namespace_metadata(namespace_object) labels = syms_to_strs(namespace_object['metadata']['labels'].to_h) annotations = match_annotations(syms_to_strs(namespace_object['metadata']['annotations'].to_h)) if @de_dot self.de_dot!(labels) self.de_dot!(annotations) end kubernetes_metadata = { 'namespace_id' => namespace_object['metadata']['uid'] } kubernetes_metadata['namespace_labels'] = labels unless labels.empty? kubernetes_metadata['namespace_annotations'] = annotations unless annotations.empty? return kubernetes_metadata end def get_pod_metadata(namespace_name, pod_name) begin metadata = @client.get_pod(pod_name, namespace_name) return if !metadata return parse_pod_metadata(metadata) rescue KubeException nil end end def initialize super end def configure(conf) super require 'kubeclient' require 'active_support/core_ext/object/blank' require 'lru_redux' if @de_dot && (@de_dot_separator =~ /\./).present? raise Fluent::ConfigError, "Invalid de_dot_separator: cannot be or contain '.'" end if @include_namespace_id # For compatibility, use include_namespace_metadata instead @include_namespace_metadata = true end if @cache_ttl < 0 @cache_ttl = :none end @cache = LruRedux::TTL::ThreadSafeCache.new(@cache_size, @cache_ttl) if @include_namespace_metadata @namespace_cache = LruRedux::TTL::ThreadSafeCache.new(@cache_size, @cache_ttl) end @tag_to_kubernetes_name_regexp_compiled = Regexp.compile(@tag_to_kubernetes_name_regexp) @container_name_to_kubernetes_regexp_compiled = Regexp.compile(@container_name_to_kubernetes_regexp) # Use Kubernetes default service account if we're in a pod. if @kubernetes_url.nil? env_host = ENV['KUBERNETES_SERVICE_HOST'] env_port = ENV['KUBERNETES_SERVICE_PORT'] if env_host.present? && env_port.present? @kubernetes_url = "https://#{env_host}:#{env_port}/api" end end # Use SSL certificate and bearer token from Kubernetes service account. if Dir.exist?(@secret_dir) ca_cert = File.join(@secret_dir, K8_POD_CA_CERT) pod_token = File.join(@secret_dir, K8_POD_TOKEN) if !@ca_file.present? and File.exist?(ca_cert) @ca_file = ca_cert end if !@bearer_token_file.present? and File.exist?(pod_token) @bearer_token_file = pod_token end end if @kubernetes_url.present? ssl_options = { client_cert: @client_cert.present? ? OpenSSL::X509::Certificate.new(File.read(@client_cert)) : nil, client_key: @client_key.present? ? OpenSSL::PKey::RSA.new(File.read(@client_key)) : nil, ca_file: @ca_file, verify_ssl: @verify_ssl ? OpenSSL::SSL::VERIFY_PEER : OpenSSL::SSL::VERIFY_NONE } auth_options = {} if @bearer_token_file.present? bearer_token = File.read(@bearer_token_file) auth_options[:bearer_token] = bearer_token end @client = Kubeclient::Client.new @kubernetes_url, @apiVersion, ssl_options: ssl_options, auth_options: auth_options begin @client.api_valid? rescue KubeException => kube_error raise Fluent::ConfigError, "Invalid Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}: #{kube_error.message}" end if @watch thread = Thread.new(self) { |this| this.start_watch } thread.abort_on_exception = true if @include_namespace_metadata namespace_thread = Thread.new(self) { |this| this.start_namespace_watch } namespace_thread.abort_on_exception = true end end end if @use_journal @merge_json_log_key = 'MESSAGE' self.class.class_eval { alias_method :filter_stream, :filter_stream_from_journal } else @merge_json_log_key = 'log' self.class.class_eval { alias_method :filter_stream, :filter_stream_from_files } end @annotations_regexps = [] @annotation_match.each do |regexp| begin @annotations_regexps << Regexp.compile(regexp) rescue RegexpError => e log.error "Error: invalid regular expression in annotation_match: #{e}" end end end def get_metadata_for_record(namespace_name, pod_name, container_name) metadata = { 'container_name' => container_name, 'namespace_name' => namespace_name, 'pod_name' => pod_name, } if @kubernetes_url.present? cache_key = "#{namespace_name}_#{pod_name}" this = self pod_metadata = @cache.getset(cache_key) { md = this.get_pod_metadata( namespace_name, pod_name, ) md } metadata.merge!(pod_metadata) if pod_metadata if @include_namespace_metadata namespace_metadata = @namespace_cache.getset(namespace_name) { begin namespace = @client.get_namespace(namespace_name) if namespace parse_namespace_metadata(namespace) end rescue KubeException nil end } metadata.merge!(namespace_metadata) if namespace_metadata end end metadata end def filter_stream(tag, es) es end def filter_stream_from_files(tag, es) new_es = MultiEventStream.new match_data = tag.match(@tag_to_kubernetes_name_regexp_compiled) if match_data metadata = { 'docker' => { 'container_id' => match_data['docker_id'] }, 'kubernetes' => get_metadata_for_record( match_data['namespace'], match_data['pod_name'], match_data['container_name'], ), } end es.each { |time, record| record = merge_json_log(record) if @merge_json_log record = record.merge(metadata) if metadata new_es.add(time, record) } new_es end def filter_stream_from_journal(tag, es) new_es = MultiEventStream.new es.each { |time, record| record = merge_json_log(record) if @merge_json_log metadata = nil if record.has_key?('CONTAINER_NAME') && record.has_key?('CONTAINER_ID_FULL') metadata = record['CONTAINER_NAME'].match(@container_name_to_kubernetes_regexp_compiled) do |match_data| metadata = { 'docker' => { 'container_id' => record['CONTAINER_ID_FULL'] }, 'kubernetes' => get_metadata_for_record( match_data['namespace'], match_data['pod_name'], match_data['container_name'], ) } metadata end unless metadata log.debug "Error: could not match CONTAINER_NAME from record #{record}" end elsif record.has_key?('CONTAINER_NAME') && record['CONTAINER_NAME'].start_with?('k8s_') log.debug "Error: no container name and id in record #{record}" end if metadata record = record.merge(metadata) end new_es.add(time, record) } new_es end def merge_json_log(record) if record.has_key?(@merge_json_log_key) log = record[@merge_json_log_key].strip if log[0].eql?('{') && log[-1].eql?('}') begin record = JSON.parse(log).merge(record) unless @preserve_json_log record.delete(@merge_json_log_key) end rescue JSON::ParserError end end end record end def de_dot!(h) h.keys.each do |ref| if h[ref] && ref =~ /\./ v = h.delete(ref) newref = ref.to_s.gsub('.', @de_dot_separator) h[newref] = v end end end def match_annotations(annotations) result = {} @annotations_regexps.each do |regexp| annotations.each do |key, value| if ::Fluent::StringUtil.match_regexp(regexp, key.to_s) result[key] = value end end end result end def start_watch begin resource_version = @client.get_pods.resourceVersion watcher = @client.watch_pods(resource_version) rescue Exception => e message = "Exception encountered fetching metadata from Kubernetes API endpoint: #{e.message}" if e.respond_to?(:response) message += " (#{e.response})" end raise Fluent::ConfigError, message end watcher.each do |notice| case notice.type when 'MODIFIED' cache_key = "#{notice.object['metadata']['namespace']}_#{notice.object['metadata']['name']}" cached = @cache[cache_key] if cached @cache[cache_key] = parse_pod_metadata(notice.object) end when 'DELETED' cache_key = "#{notice.object['metadata']['namespace']}_#{notice.object['metadata']['name']}" @cache.delete(cache_key) else # Don't pay attention to creations, since the created pod may not # end up on this node. end end end def start_namespace_watch resource_version = @client.get_namespaces.resourceVersion watcher = @client.watch_namespaces(resource_version) watcher.each do |notice| puts notice case notice.type when 'MODIFIED' cache_key = notice.object['metadata']['name'] cached = @namespace_cache[cache_key] if cached @namespace_cache[cache_key] = parse_namespace_metadata(notice.object) end when 'DELETED' @namespace_cache.delete(notice.object['metadata']['name']) else # Don't pay attention to creations, since the created namespace may not # be used by any pod on this node. end end end end end