require "fluent/plugin/filter" module Fluent module Plugin class GeovisFilter < Fluent::Plugin::Filter Fluent::Plugin.register_filter("geovis", self) K8_POD_CA_CERT = 'ca.crt' K8_POD_TOKEN = 'token' desc 'use the k8s insecure port,like 8080,if true then init the http kubeclient' config_param :insecure, :bool, default: false desc 'the kube-apiserver restful url' config_param :kubernetes_url, :string, default: nil desc 'the k8s serviceAccount for comunicating with kube-apiserver' config_param :secret_dir, :string, default: '/var/run/secrets/kubernetes.io/serviceaccount' config_param :apiVersion, :string, default: 'v1' config_param :client_cert, :string, default: nil config_param :client_key, :string, default: nil config_param :ca_file, :string, default: nil config_param :verify_ssl, :bool, default: true config_param :bearer_token_file, :string, default: nil def configure(conf) super def log.trace? level == Fluent::Log::LEVEL_TRACE end require 'kubeclient' log.debug "Geovis Filter configure" # Use Kubernetes default service account if we're in a pod. if @kubernetes_url.nil? log.debug "Kubernetes URL is not set - inspecting environment" env_host = ENV['KUBERNETES_SERVICE_HOST'] env_port = ENV['KUBERNETES_SERVICE_PORT'] if !env_host.nil? && !env_port.nil? if insecure @kubernetes_url = "http://#{env_host}:#{env_port}/api" else @kubernetes_url = "https://#{env_host}:#{env_port}/api" end log.debug "Kubernetes URL is now '#{@kubernetes_url}'" end end # Use SSL certificate and bearer token from Kubernetes service account. if Dir.exist?(@secret_dir) log.debug "Found directory with secrets: #{@secret_dir}" ca_cert = File.join(@secret_dir, K8_POD_CA_CERT) pod_token = File.join(@secret_dir, K8_POD_TOKEN) if @ca_file.nil? and File.exist?(ca_cert) log.debug "Found CA certificate: #{ca_cert}" @ca_file = ca_cert end if @bearer_token_file.nil? and File.exist?(pod_token) log.debug "Found pod token: #{pod_token}" @bearer_token_file = pod_token end end if !@kubernetes_url.nil? if insecure log.debug "Creating insecure K8S client" @client = Kubeclient::Client.new @kubernetes_url, @apiVersion else ssl_options = { client_cert: !@client_cert.nil? ? OpenSSL::X509::Certificate.new(File.read(@client_cert)) : nil, client_key: !@client_key.nil? ? OpenSSL::PKey::RSA.new(File.read(@client_key)) : nil, ca_file: @ca_file, verify_ssl: @verify_ssl ? OpenSSL::SSL::VERIFY_PEER : OpenSSL::SSL::VERIFY_NONE } auth_options = {} if !@bearer_token_file.nil? bearer_token = File.read(@bearer_token_file) auth_options[:bearer_token] = bearer_token end log.debug "Creating secure K8S client" @client = Kubeclient::Client.new @kubernetes_url, @apiVersion, ssl_options: ssl_options, auth_options: auth_options end begin @client.api_valid? rescue KubeException => kube_error raise Fluent::ConfigError, "Invalid Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}: #{kube_error.message}" end @podsHash = {} fetch_pods end end def filter_stream(tag, es) log.debug "Geovis Filter filter" return es if (es.respond_to?(:empty?) && es.empty?) || !es.is_a?(Fluent::EventStream) new_es = Fluent::MultiEventStream.new es.each do |time, record| metadata = nil if @podsHash.has_key?(record['uuid']) metadata = @podsHash[record['uuid']] log.debug "find pod for uuid: #{record['uuid']}" else fetch_pods if @podsHash.has_key?(record['uuid']) metadata = @podsHash[record['uuid']] else log.error "pod does not exist: #{record['uuid']}" end end record = record.merge(metadata) if metadata new_es.add(time, record) end new_es end def fetch_pods() log.debug "Geovis Filter fetch_pods" newPodsHash = {} @client.get_pods.each do |pod_object| kubernetes_metadata = parse_pod_metadata(pod_object) newPodsHash[kubernetes_metadata['kubernetes']['pod_id']] = kubernetes_metadata end @podsHash = newPodsHash.merge @podsHash end def parse_pod_metadata(pod_object) labels = syms_to_strs(pod_object['metadata']['labels'].to_h) if @de_dot self.de_dot!(labels) self.de_dot!(annotations) end # collect container informations container_meta = {} begin pod_object['status']['containerStatuses'].each do|container_status| # get plain container id (eg. docker://hash -> hash) container_id = container_status['containerID'].sub /^[-_a-zA-Z0-9]+:\/\//, '' container_meta[container_id] = { 'name' => container_status['name'], 'image' => container_status['image'], 'image_id' => container_status['imageID'] } end rescue log.debug("parsing container meta information failed for: #{pod_object['metadata']['namespace']}/#{pod_object['metadata']['name']} ") end kubernetes_metadata = { 'namespace_name' => pod_object['metadata']['namespace'], 'pod_id' => pod_object['metadata']['uid'], 'pod_name' => pod_object['metadata']['name'], 'containers' => syms_to_strs(container_meta), 'labels' => labels, 'host' => pod_object['spec']['nodeName'], 'master_url' => @kubernetes_url } metadata = { 'kubernetes' => kubernetes_metadata } return metadata end def syms_to_strs(hsh) newhsh = {} hsh.each_pair do |kk,vv| if vv.is_a?(Hash) vv = syms_to_strs(vv) end if kk.is_a?(Symbol) newhsh[kk.to_s] = vv else newhsh[kk] = vv end end newhsh end end end end