# Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. require 'time' require 'fluent/plugin/input' require 'kubeclient' require 'multi_json' module Fluent module Plugin class KubernetesMetricsAggregatorInput < Fluent::Plugin::Input @@namespace_usage_metrics_map = {} @@node_requests_limits_metrics_map = {} @@namespace_resource_usage_metrics_map = {} @@node_resource_usage_metrics_map = {} class UsageMetricsUnit def initialize @cpu_limit = 0 @cpu_request = 0 @memory_limit = 0 @memory_request = 0 end def add_usage_metrics(cpu_limit, cpu_request, memory_limit, memory_request) cpu = get_cpu_or_memory_value(cpu_limit) mult = get_cpu_mult(cpu_limit) @cpu_limit += cpu * mult cpu = get_cpu_or_memory_value(cpu_request) mult = get_cpu_mult(cpu_request) @cpu_request += cpu * mult memory = get_cpu_or_memory_value(memory_limit) mult = get_memory_mult(memory_limit) @memory_limit += memory * mult memory = get_cpu_or_memory_value(memory_request) mult = get_memory_mult(memory_request) @memory_request += memory * mult end def get_cpu_value_and_multiplier(cpu) # m cpu is assumed standard @cpu_mult = 1 @cpu_mult = 1000 if cpu[-1] != 'm' cpu.delete('^0-9.').to_i end def get_cpu_or_memory_value(resource) resource = resource.tr('^0-9.', '').to_i resource end def get_cpu_mult(cpu) cpu_mult = 1 cpu_mult = 1000 if cpu[-1] != 'm' cpu_mult end # https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#meaning-of-memory # 1 Ki = 1024 bytes # 1 K = 1000 bytes = 1000/1024 Ki = 1000/1024*1024 Mi def get_memory_mult(memory) memory_mult = if memory[-2..] == 'Ki' 1.0 / 1024 elsif memory[-1] == 'K' 1e3 / 1024 ** 2 elsif memory[-2..] == 'Mi' 1 elsif memory[-1] == 'M' 1e6 / 1024 ** 2 elsif memory[-2..] == 'Gi' 1024 elsif memory[-1] == 'G' 1e9 / 1024 ** 2 elsif memory[-2..] == 'Ti' 1024 ** 2 elsif memory[-1] == 'T' 1e12 / 1024 ** 2 elsif memory[-2..] == 'Pi' 1024**3 elsif memory[-2] == 'P' 1e15 / 1024 ** 2 elsif memory[-2..] == 'Ei' 1024**4 elsif memory[-2] == 'E' 1e18 / 1024 ** 2 else 1.0 / 1024 ** 2 end memory_mult end end class ResourceUsageMetricsUnit def initialize @cpu_usage = 0 @memory_usage = 0 end def add_resource_usage_metrics(cpu_usage, memory_usage) @cpu_usage += cpu_usage @memory_usage += memory_usage end end Fluent::Plugin.register_input('kubernetes_metrics_aggregator', self) helpers :timer desc 'URL of the kubernetes API server.' config_param :kubernetes_url, :string, default: nil desc 'The port that kubelet is listening to.' config_param :kubelet_port, :integer, default: 10_250 desc 'The tag of the event.' config_param :tag, :string, default: 'kubernetes.metrics.*' desc 'How often it pulls metrics.' config_param :interval, :time, default: "15s" desc 'Path to a kubeconfig file points to a cluster the plugin should collect metrics from. Mostly useful when running fluentd outside of the cluster. When `kubeconfig` is set, `kubernetes_url`, `client_cert`, `client_key`, `ca_file`, `insecure_ssl`, `bearer_token_file`, and `secret_dir` will all be ignored.' config_param :kubeconfig, :string, default: nil desc 'Path to the certificate file for this client.' config_param :client_cert, :string, default: nil desc 'Path to the private key file for this client.' config_param :client_key, :string, default: nil desc 'Path to the CA file.' config_param :ca_file, :string, default: nil desc "If `insecure_ssl` is set to `true`, it won't verify apiserver's certificate." config_param :insecure_ssl, :bool, default: false desc 'Path to the file contains the API token. By default it reads from the file "token" in the `secret_dir`.' config_param :bearer_token_file, :string, default: nil desc "Path of the location where pod's service account's credentials are stored." config_param :secret_dir, :string, default: '/var/run/secrets/kubernetes.io/serviceaccount' desc 'The name of the cluster, where the plugin is deployed.' config_param :cluster_name, :string, default: 'cluster_name' def configure(conf) super @mutex_node_req_lim = Mutex.new @mutex_node_res_usage = Mutex.new parse_tag initialize_client end def start super timer_execute :limits_request_scraper, @interval, &method(:scrape_limits_requests_metrics) timer_execute :node_scraper, @interval, &method(:scrape_node_metrics) timer_execute :resource_usage_scraper, @interval, &method(:scrape_resource_usage_metrics) end def close @watchers.each &:finish if @watchers super end private def parse_tag @tag_prefix, @tag_suffix = @tag.split('*') if @tag.include?('*') end def generate_tag(item_name) return @tag unless @tag_prefix [@tag_prefix, item_name, @tag_suffix].join end def init_with_kubeconfig(options = {}) config = Kubeclient::Config.read @kubeconfig current_context = config.context @client = Kubeclient::Client.new( current_context.api_endpoint, current_context.api_version, options.merge( ssl_options: current_context.ssl_options, auth_options: current_context.auth_options ) ) end def init_without_kubeconfig(_options = {}) kubernetes_url_final = nil # mostly borrowed from Fluentd Kubernetes Metadata Filter Plugin if @kubernetes_url.nil? # Use Kubernetes default service account if we're in a pod. env_host = ENV['KUBERNETES_SERVICE_HOST'] env_port = ENV['KUBERNETES_SERVICE_PORT'] if env_host && env_port kubernetes_url_final = "https://#{env_host}:#{env_port}/api/" end else kubernetes_url_final = "https://#{@kubernetes_url}:#{@kubelet_port}/api/" end raise Fluent::ConfigError, 'kubernetes url is not set in configuration, or environment variables' unless kubernetes_url_final # Use SSL certificate and bearer token from Kubernetes service account. if Dir.exist?(@secret_dir) secret_ca_file = File.join(@secret_dir, 'ca.crt') secret_token_file = File.join(@secret_dir, 'token') if @ca_file.nil? && File.exist?(secret_ca_file) @ca_file = secret_ca_file end if @bearer_token_file.nil? && File.exist?(secret_token_file) @bearer_token_file = secret_token_file end end ssl_options = { client_cert: @client_cert && OpenSSL::X509::Certificate.new(File.read(@client_cert)), client_key: @client_key && OpenSSL::PKey::RSA.new(File.read(@client_key)), ca_file: @ca_file, verify_ssl: @insecure_ssl ? OpenSSL::SSL::VERIFY_NONE : OpenSSL::SSL::VERIFY_PEER } auth_options = {} auth_options[:bearer_token] = File.read(@bearer_token_file) if @bearer_token_file @client = Kubeclient::Client.new( kubernetes_url_final, 'v1', ssl_options: ssl_options, auth_options: auth_options ) begin @client.api_valid? rescue KubeException => kube_error raise Fluent::ConfigError, "Invalid Kubernetes API #{@api_version} endpoint #{kubernetes_url_final}: #{kube_error.message}" end end def initialize_client options = { timeouts: { open: 10, read: nil } } if @kubeconfig.nil? init_without_kubeconfig options else init_with_kubeconfig options end end def parse_time(metric_time) Fluent::EventTime.from_time Time.iso8601(metric_time) end def underscore(camlcase) camlcase.gsub(/[A-Z]/) { |c| "_#{c.downcase}" } end def get_cpu_mult(cpu) cpu_mult = 1 cpu_mult = 1000 if cpu[-1] != 'm' cpu_mult end def get_cpu_value(resource) cpu_val = resource.tr('^0-9.', '').to_i mult = get_cpu_mult(resource) cpu_val = cpu_val * mult cpu_val end def get_memory_mult(memory) memory_mult = if memory[-2..] == 'Ki' 1.0 / 1024 elsif memory[-1] == 'K' 1e3 / 1024 ** 2 elsif memory[-2..] == 'Mi' 1 elsif memory[-1] == 'M' 1e6 / 1024 ** 2 elsif memory[-2..] == 'Gi' 1024 elsif memory[-1] == 'G' 1e9 / 1024 ** 2 elsif memory[-2..] == 'Ti' 1024 ** 2 elsif memory[-1] == 'T' 1e12 / 1024 ** 2 elsif memory[-2..] == 'Pi' 1024**3 elsif memory[-2] == 'P' 1e15 / 1024 ** 2 elsif memory[-2..] == 'Ei' 1024**4 elsif memory[-2] == 'E' 1e18 / 1024 ** 2 else 1.0 / 1024 ** 2 end memory_mult end def get_memory_value(resource) mem_val = resource.tr('^0-9.', '').to_i mult = get_memory_mult(resource) mem_val = mem_val * mult mem_val end def emit_limits_requests_metrics(tag, scraped_at, labels, limits_requests_metric) router.emit tag + '.cpu.limit', Fluent::EventTime.from_time(scraped_at), labels.merge( 'value' => limits_requests_metric.instance_variable_get(:@cpu_limit) ) router.emit tag + '.cpu.request', Fluent::EventTime.from_time(scraped_at), labels.merge('value' => limits_requests_metric.instance_variable_get(:@cpu_request)) router.emit tag + '.memory.limit', Fluent::EventTime.from_time(scraped_at), labels.merge('value' => limits_requests_metric.instance_variable_get(:@memory_limit)) router.emit tag + '.memory.request', Fluent::EventTime.from_time(scraped_at), labels.merge('value' => limits_requests_metric.instance_variable_get(:@memory_request)) end def emit_resource_usage_metrics(tag, scraped_at, labels, resource_usage_metric) router.emit tag + '.cpu.usage', Fluent::EventTime.from_time(scraped_at), labels.merge('value' => resource_usage_metric.instance_variable_get(:@cpu_usage)) router.emit tag + '.memory.usage', Fluent::EventTime.from_time(scraped_at), labels.merge('value' => resource_usage_metric.instance_variable_get(:@memory_usage)) end def limits_requests_api @limits_requests_api = begin @client.discover unless @client.discovered @client.rest_client['/pods'].tap do |endpoint| log.debug("Use URL #{endpoint.url} for scraping limits requests metrics") end end end def scrape_limits_requests_metrics response = limits_requests_api.get(@client.headers) handle_limits_requests_res(response) rescue StandardError => e log.error "Failed to get limit metrics, error=#{$ERROR_INFO}, #{e.inspect}" log.error_backtrace end # This method is used to handle responses from the kube apiserver api def handle_limits_requests_res(response) # Checking response codes only for a successful GET request viz., 2XX codes if (response.code < 300) && (response.code > 199) @scraped_at = Time.now process_limits_requests_res MultiJson.load(response.body) else log.error "ExMultiJson.load(response.body) expected 2xx from summary API, but got #{response.code}. Response body = #{response.body}" end rescue StandardError => e log.error "Failed to scrape limit metrics, error=#{$ERROR_INFO}, #{e.inspect}" log.error_backtrace end def process_limits_requests_res(response) @scraped_at = Time.now @mutex_node_req_lim.synchronize do Array(response['items']).each do |pod_json| pod_namespace = pod_json['metadata']['namespace'] pod_node_name = pod_json['spec']['nodeName'] if @@namespace_usage_metrics_map[pod_namespace].nil? namespace_usage_metrics = UsageMetricsUnit.new @@namespace_usage_metrics_map[pod_namespace] = namespace_usage_metrics end pod_containers = pod_json['spec']['containers'] pod_usage_metrics = UsageMetricsUnit.new Array(pod_containers).each do |container_json| container_usage_metrics = UsageMetricsUnit.new cpu_limit = '0' memory_limit = '0' cpu_request = '0' memory_request = '0' unless container_json['resources']['limits'].nil? unless container_json['resources']['limits']['cpu'].nil? cpu_limit = container_json['resources']['limits']['cpu'] end unless container_json['resources']['limits']['memory'].nil? memory_limit = container_json['resources']['limits']['memory'] end end unless container_json['resources']['requests'].nil? unless container_json['resources']['requests']['cpu'].nil? cpu_request = container_json['resources']['requests']['cpu'] end unless container_json['resources']['requests']['memory'].nil? memory_request = container_json['resources']['requests']['memory'] end end container_usage_metrics.add_usage_metrics(cpu_limit, cpu_request, memory_limit, memory_request) container_labels = { 'pod-name' => pod_json['metadata']['name'], 'namespace' => pod_json['metadata']['namespace'], 'name' => container_json['name'], 'image' => container_json['image'], 'node' => pod_json['spec']['nodeName'] } emit_limits_requests_metrics(generate_tag('container'), @scraped_at, container_labels, container_usage_metrics) pod_usage_metrics.add_usage_metrics(cpu_limit, cpu_request, memory_limit, memory_request) end pod_labels = { 'name' => pod_json['metadata']['name'], 'namespace' => pod_json['metadata']['namespace'], 'node' => pod_json['spec']['nodeName'] } emit_limits_requests_metrics(generate_tag('pod'), @scraped_at, pod_labels, pod_usage_metrics) @@namespace_usage_metrics_map[pod_namespace].add_usage_metrics(pod_usage_metrics.instance_variable_get(:@cpu_limit).to_s + ('m'), pod_usage_metrics.instance_variable_get(:@cpu_request).to_s + ('m'), pod_usage_metrics.instance_variable_get(:@memory_limit).to_s + ('Mi'), pod_usage_metrics.instance_variable_get(:@memory_request).to_s + ('Mi')) if @@node_requests_limits_metrics_map[pod_node_name].nil? node_name_usage_metrics = UsageMetricsUnit.new @@node_requests_limits_metrics_map[pod_node_name] = node_name_usage_metrics end @@node_requests_limits_metrics_map[pod_node_name].add_usage_metrics(pod_usage_metrics.instance_variable_get(:@cpu_limit).to_s + ('m'), pod_usage_metrics.instance_variable_get(:@cpu_request).to_s + ('m'), pod_usage_metrics.instance_variable_get(:@memory_limit).to_s + ('Mi'), pod_usage_metrics.instance_variable_get(:@memory_request).to_s + ('Mi')) pod_usage_metrics = nil end end cluster_usage_metrics = UsageMetricsUnit.new @@namespace_usage_metrics_map.each do |key, value| cluster_usage_metrics.add_usage_metrics(value.instance_variable_get(:@cpu_limit).to_s + ('m'), value.instance_variable_get(:@cpu_request).to_s + ('m'), value.instance_variable_get(:@memory_limit).to_s + ('Mi'), value.instance_variable_get(:@memory_request).to_s + ('Mi')) emit_limits_requests_metrics(generate_tag('namespace'), @scraped_at, { 'name' => key }, value) value = nil end emit_limits_requests_metrics(generate_tag('cluster'), @scraped_at, { 'name' => @cluster_name }, cluster_usage_metrics) cluster_usage_metrics = nil @@namespace_usage_metrics_map = nil @@namespace_usage_metrics_map = {} end def node_api @node_api = begin @client.discover unless @client.discovered @client.rest_client['/nodes'].tap do |endpoint| log.debug("Use URL #{endpoint.url} for scraping node metrics") end end end def scrape_node_metrics response = node_api.get(@client.headers) handle_node_response(response) rescue StandardError => e log.error "Failed to get node metrics, error=#{$ERROR_INFO}, #{e.inspect}" log.error_backtrace end # This method is used to handle responses from the kubeapiserver api def handle_node_response(response) # Checking response codes only for a successful GET request viz., 2XX codes if (response.code < 300) && (response.code > 199) @scraped_node_at = Time.now process_node_response MultiJson.load(response.body) else log.error "ExMultiJson.load(response.body) expected 2xx from summary API, but got #{response.code}. Response body = #{response.body}" end rescue StandardError => e log.error "Failed to scrape node metrics, error=#{$ERROR_INFO}, #{e.inspect}" log.error_backtrace end def process_node_response(response) @mutex_node_req_lim.synchronize do Array(response['items']).each do |node_json| node_name = node_json['metadata']['name'] node_cpu_capacity = get_cpu_value(node_json['status']['capacity']['cpu']) router.emit generate_tag('node') << ('.cpu.capacity'), Fluent::EventTime.from_time(@scraped_node_at), 'node' => node_name, 'value' => node_cpu_capacity node_cpu_allocatable = get_cpu_value(node_json['status']['allocatable']['cpu']) router.emit generate_tag('node') << ('.cpu.allocatable'), Fluent::EventTime.from_time(@scraped_node_at), 'node' => node_name, 'value' => node_cpu_allocatable node_memory_capacity = get_memory_value(node_json['status']['capacity']['memory']) router.emit generate_tag('node') << ('.memory.capacity'), Fluent::EventTime.from_time(@scraped_node_at), 'node' => node_name, 'value' => node_memory_capacity node_memory_allocatable = get_memory_value(node_json['status']['allocatable']['memory']) router.emit generate_tag('node') << ('.memory.allocatable'), Fluent::EventTime.from_time(@scraped_node_at), 'node' => node_name, 'value' => node_memory_allocatable node_req_lim = UsageMetricsUnit.new if @@node_requests_limits_metrics_map[node_name].nil? next end node_req_lim = @@node_requests_limits_metrics_map[node_name] node_cpu_reservation = node_req_lim.instance_variable_get(:@cpu_request).to_f / node_cpu_allocatable router.emit generate_tag('node') << ('.cpu.reservation'), Fluent::EventTime.from_time(@scraped_node_at), 'node' => node_name, 'value' => node_cpu_reservation node_memory_reservation = node_req_lim.instance_variable_get(:@memory_request).to_f / node_memory_allocatable router.emit generate_tag('node') << ('.memory.reservation'), Fluent::EventTime.from_time(@scraped_node_at), 'node' => node_name, 'value' => node_memory_reservation end @@node_requests_limits_metrics_map = nil @@node_requests_limits_metrics_map = {} end @mutex_node_res_usage.synchronize do Array(response['items']).each do |node_json| node_name = node_json['metadata']['name'] node_cpu_allocatable = get_cpu_value(node_json['status']['allocatable']['cpu']) node_memory_allocatable = get_memory_value(node_json['status']['allocatable']['memory']) node_res_usage = ResourceUsageMetricsUnit.new if @@node_resource_usage_metrics_map[node_name].nil? next end node_res_usage = @@node_resource_usage_metrics_map[node_name] # https://github.com/kubernetes/heapster/blob/c78cc312ab3901acfe5c2f95f7a621909c8455ad/metrics/processors/node_autoscaling_enricher.go#L62 node_cpu_utilization = node_res_usage.instance_variable_get(:@cpu_usage).to_f / 1_000_000 * node_cpu_allocatable # converting from nano cores to milli core router.emit generate_tag('node') << ('.cpu.utilization'), Fluent::EventTime.from_time(@scraped_node_at), 'node' => node_name, 'value' => node_cpu_utilization node_memory_utilization = node_res_usage.instance_variable_get(:@memory_usage).to_f / node_memory_allocatable # converting from bytes to megabytes router.emit generate_tag('node') << ('.memory.utilization'), Fluent::EventTime.from_time(@scraped_node_at), 'node' => node_name, 'value' => node_memory_utilization end @@node_resource_usage_metrics_map = nil @@node_resource_usage_metrics_map = {} end end def resource_usage_api @resource_usage_api = begin @client.discover unless @client.discovered @client.rest_client['/nodes'].tap do |endpoint| log.debug("Use URL #{endpoint.url} for scraping node metrics") end end end def scrape_resource_usage_metrics response = resource_usage_api.get(@client.headers) handle_resource_usage_response(response) rescue StandardError => e log.error "Failed to get resource usage metrics, error=#{$ERROR_INFO}, #{e.inspect}" log.error_backtrace end # This method is used to handle responses from the kubelet summary api def handle_resource_usage_response(response) # Checking response codes only for a successful GET request viz., 2XX codes if (response.code < 300) && (response.code > 199) @scraped_at = Time.now process_resource_usage_res MultiJson.load(response.body) else log.error "ExMultiJson.load(response.body) expected 2xx from summary API, but got #{response.code}. Response body = #{response.body}" end rescue StandardError => e log.error "Failed to scrape resource usage metrics, error=#{$ERROR_INFO}, #{e.inspect}" log.error_backtrace end def process_resource_usage_res(response) @scraped_node_at = Time.now @mutex_node_res_usage.synchronize do Array(response['items']).each do |node_json| node_name = node_json['metadata']['name'] node_rest_client = begin @client.discover unless @client.discovered @client.rest_client["/nodes/#{node_name}:#{@kubelet_port}/proxy/stats/summary"].tap do |endpoint| log.debug("Use URL #{endpoint.url} for scraping resource usage metrics") end end begin node_response = JSON.parse(node_rest_client.get(@client.headers)) rescue RestClient::ServiceUnavailable log.warn("Couldn't scrap metric for node '#{node_name} as it is unavailable. Ignoring it.'") next end Array(node_response['pods']).each do |pod_json| unless pod_json['cpu'].nil? || pod_json['memory'].nil? pod_cpu_usage = pod_json['cpu'].fetch('usageNanoCores', 0)/ 1_000_000.to_f pod_memory_usage = pod_json['memory'].fetch('usageBytes', 0) / 1024 ** 2.to_f # Converting to Mi pod_namespace = pod_json['podRef']['namespace'] pod_usage = ResourceUsageMetricsUnit.new pod_usage.add_resource_usage_metrics(pod_cpu_usage, pod_memory_usage) if @@namespace_resource_usage_metrics_map[pod_namespace].nil? @@namespace_resource_usage_metrics_map[pod_namespace] = pod_usage else @@namespace_resource_usage_metrics_map[pod_namespace].add_resource_usage_metrics(pod_cpu_usage, pod_memory_usage) end if @@node_resource_usage_metrics_map[node_name].nil? node_name_usage_metrics = ResourceUsageMetricsUnit.new @@node_resource_usage_metrics_map[node_name] = node_name_usage_metrics end @@node_resource_usage_metrics_map[node_name].add_resource_usage_metrics(pod_cpu_usage, pod_memory_usage) pod_usage = nil end if pod_json['cpu'].nil? || pod_json['memory'].nil? unless pod_json['containers'].nil? pod_namespace = pod_json['podRef']['namespace'] Array(pod_json['containers']).each do |container_json| unless container_json['cpu'].nil? || container_json['memory'].nil? container_cpu_usage = container_json['cpu'].fetch('usageNanoCores', 0)/ 1_000_000.to_f container_memory_usage = container_json['memory'].fetch('usageBytes', 0) / 1024 ** 2.to_f # Converting to Mi container_usage = ResourceUsageMetricsUnit.new container_usage.add_resource_usage_metrics(container_cpu_usage, container_memory_usage) if @@namespace_resource_usage_metrics_map[pod_namespace].nil? namespace_usage_metrics = ResourceUsageMetricsUnit.new @@namespace_resource_usage_metrics_map[pod_namespace] = container_usage else @@namespace_resource_usage_metrics_map[pod_namespace].add_resource_usage_metrics(container_cpu_usage, container_memory_usage) end if @@node_resource_usage_metrics_map[node_name].nil? node_name_usage_metrics = ResourceUsageMetricsUnit.new @@node_resource_usage_metrics_map[node_name] = node_name_usage_metrics else @@node_resource_usage_metrics_map[node_name].add_resource_usage_metrics(container_cpu_usage, container_memory_usage) end container_usage = nil end end end end end end end cluster_usage_metrics = ResourceUsageMetricsUnit.new @@namespace_resource_usage_metrics_map.each do |key, value| cluster_usage_metrics.add_resource_usage_metrics(value.instance_variable_get(:@cpu_usage), value.instance_variable_get(:@memory_usage)) emit_resource_usage_metrics(generate_tag('namespace'), @scraped_at, { 'name' => key }, value) value = nil end emit_resource_usage_metrics(generate_tag('cluster'), @scraped_at, { 'name' => @cluster_name }, cluster_usage_metrics) cluster_usage_metrics = nil @@namespace_resource_usage_metrics_map = nil @@namespace_resource_usage_metrics_map = {} end end end end