require "fluent/plugin/filter" module Fluent module Plugin class GeovisFilter < Fluent::Plugin::Filter Fluent::Plugin.register_filter("geovis", self) K8_POD_CA_CERT = 'ca.crt' K8_POD_TOKEN = 'token' desc 'use the k8s insecure port,like 8080,if true then init the http kubeclient' config_param :insecure, :bool, default: false desc 'the kube-apiserver restful url' config_param :kubernetes_url, :string, default: nil desc 'the k8s serviceAccount for comunicating with kube-apiserver' config_param :secret_dir, :string, default: '/var/run/secrets/kubernetes.io/serviceaccount' config_param :apiVersion, :string, default: 'v1' config_param :client_cert, :string, default: nil config_param :client_key, :string, default: nil config_param :ca_file, :string, default: nil config_param :verify_ssl, :bool, default: true config_param :bearer_token_file, :string, default: nil def configure(conf) super def log.trace? level == Fluent::Log::LEVEL_TRACE end require 'kubeclient' log.info "==================================FetchpodnameFilter configure" # Use Kubernetes default service account if we're in a pod. if @kubernetes_url.nil? log.debug "Kubernetes URL is not set - inspecting environment" env_host = ENV['KUBERNETES_SERVICE_HOST'] env_port = ENV['KUBERNETES_SERVICE_PORT'] if !env_host.nil? && !env_port.nil? if insecure @kubernetes_url = "http://#{env_host}:#{env_port}/api" else @kubernetes_url = "https://#{env_host}:#{env_port}/api" end log.debug "Kubernetes URL is now '#{@kubernetes_url}'" end end # Use SSL certificate and bearer token from Kubernetes service account. if Dir.exist?(@secret_dir) log.debug "Found directory with secrets: #{@secret_dir}" ca_cert = File.join(@secret_dir, K8_POD_CA_CERT) pod_token = File.join(@secret_dir, K8_POD_TOKEN) if @ca_file.nil? and File.exist?(ca_cert) log.debug "Found CA certificate: #{ca_cert}" @ca_file = ca_cert end if @bearer_token_file.nil? and File.exist?(pod_token) log.debug "Found pod token: #{pod_token}" @bearer_token_file = pod_token end end if !@kubernetes_url.nil? if insecure @client = Kubeclient::Client.new @kubernetes_url, @apiVersion else ssl_options = { client_cert: !@client_cert.nil? ? OpenSSL::X509::Certificate.new(File.read(@client_cert)) : nil, client_key: !@client_key.nil? ? OpenSSL::PKey::RSA.new(File.read(@client_key)) : nil, ca_file: @ca_file, verify_ssl: @verify_ssl ? OpenSSL::SSL::VERIFY_PEER : OpenSSL::SSL::VERIFY_NONE } auth_options = {} if !@bearer_token_file.nil? bearer_token = File.read(@bearer_token_file) auth_options[:bearer_token] = bearer_token end log.debug "Creating K8S client" @client = Kubeclient::Client.new @kubernetes_url, @apiVersion, ssl_options: ssl_options, auth_options: auth_options end begin @client.api_valid? rescue KubeException => kube_error raise Fluent::ConfigError, "Invalid Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}: #{kube_error.message}" end end end def filter_stream(tag, es) puts "==================================FetchpodnameFilter filter" return es if (es.respond_to?(:empty?) && es.empty?) || !es.is_a?(Fluent::EventStream) new_es = Fluent::MultiEventStream.new es.each do |time, record| puts record['uuid'] new_es.add(time, record) end new_es end end end end