lib/fluent/plugin/in_kubernetes_objects.rb in fluent-plugin-kubernetes-objects-1.0.0 vs lib/fluent/plugin/in_kubernetes_objects.rb in fluent-plugin-kubernetes-objects-1.1.0

- old
+ new

@@ -1,13 +1,13 @@ # frozen_string_literal: true -require "fluent/plugin/input" +require 'fluent/plugin/input' require 'kubeclient' module Fluent::Plugin class KubernetesObjectsInput < Fluent::Plugin::Input - VERSION = "1.0.0" + VERSION = '1.1.0'.freeze Fluent::Plugin.register_input('kubernetes_objects', self) helpers :storage, :thread @@ -74,21 +74,21 @@ config_param :field_selector, :string, default: nil end config_section :storage do # use memory by default - config_set_default :usage, "checkpoints" - config_set_default :@type, "local" + config_set_default :usage, 'checkpoints' + config_set_default :@type, 'local' config_set_default :persistent, false end def configure(conf) super - raise Fluent::ConfigError, "At least one <pull> or <watch> is required, but found none." if @pull_objects.empty? && @watch_objects.empty? + raise Fluent::ConfigError, 'At least one <pull> or <watch> is required, but found none.' if @pull_objects.empty? && @watch_objects.empty? - @storage = storage_create usage: "checkpoints" + @storage = storage_create usage: 'checkpoints' parse_tag initialize_client end @@ -116,122 +116,122 @@ end def initialize_client # mostly borrowed from Fluentd Kubernetes Metadata Filter Plugin if @kubernetes_url.nil? - # Use Kubernetes default service account if we're in a pod. - env_host = ENV['KUBERNETES_SERVICE_HOST'] - env_port = ENV['KUBERNETES_SERVICE_PORT'] - if env_host && env_port - @kubernetes_url = "https://#{env_host}:#{env_port}/#{@api_version == 'v1' ? 'api' : 'apis'}" - end + # Use Kubernetes default service account if we're in a pod. + env_host = ENV['KUBERNETES_SERVICE_HOST'] + env_port = ENV['KUBERNETES_SERVICE_PORT'] + if env_host && env_port + @kubernetes_url = "https://#{env_host}:#{env_port}/#{@api_version == 'v1' ? 'api' : 'apis'}" + end end - raise Fluent::ConfigError, "kubernetes url is not set" unless @kubernetes_url + raise Fluent::ConfigError, 'kubernetes url is not set' unless @kubernetes_url # Use SSL certificate and bearer token from Kubernetes service account. if Dir.exist?(@secret_dir) - secret_ca_file = File.join(@secret_dir, 'ca.cert') - secret_token_file = File.join(@secret_dir, 'token') + secret_ca_file = File.join(@secret_dir, 'ca.crt') + secret_token_file = File.join(@secret_dir, 'token') - if @ca_file.nil? and File.exist?(secret_ca_file) - @ca_file = secret_ca_file - end + if @ca_file.nil? && File.exist?(secret_ca_file) + @ca_file = secret_ca_file + end - if @bearer_token_file.nil? and File.exist?(secret_token_file) - @bearer_token_file = secret_token_file - end - end + if @bearer_token_file.nil? && File.exist?(secret_token_file) + @bearer_token_file = secret_token_file + end + end ssl_options = { - client_cert: @client_cert && OpenSSL::X509::Certificate.new(File.read(@client_cert)), - client_key: @client_key && OpenSSL::PKey::RSA.new(File.read(@client_key)), - ca_file: @ca_file, - verify_ssl: @insecure_ssl ? OpenSSL::SSL::VERIFY_NONE : OpenSSL::SSL::VERIFY_PEER + client_cert: @client_cert && OpenSSL::X509::Certificate.new(File.read(@client_cert)), + client_key: @client_key && OpenSSL::PKey::RSA.new(File.read(@client_key)), + ca_file: @ca_file, + verify_ssl: @insecure_ssl ? OpenSSL::SSL::VERIFY_NONE : OpenSSL::SSL::VERIFY_PEER } auth_options = {} auth_options[:bearer_token] = File.read(@bearer_token_file) if @bearer_token_file @client = Kubeclient::Client.new( - @kubernetes_url, @api_version, - ssl_options: ssl_options, - auth_options: auth_options + @kubernetes_url, @api_version, + ssl_options: ssl_options, + auth_options: auth_options ) begin - @client.api_valid? + @client.api_valid? rescue KubeException => kube_error - raise Fluent::ConfigError, "Invalid Kubernetes API #{@api_version} endpoint #{@kubernetes_url}: #{kube_error.message}" - end + raise Fluent::ConfigError, "Invalid Kubernetes API #{@api_version} endpoint #{@kubernetes_url}: #{kube_error.message}" + end end def start_pullers @pull_objects.each(&method(:create_pull_thread)) end def start_watchers @watchers = @watch_objects.map do |o| - o = o.to_h.dup - o[:as] = :raw - resource_name = o.delete(:resource_name) - version = @storage.get(resource_name) - o[:resource_version] = version if version - @client.public_send("watch_#{resource_name}", o).tap { |watcher| - create_watcher_thread resource_name, watcher - } + o = o.to_h.dup + o[:as] = :raw + resource_name = o.delete(:resource_name) + version = @storage.get(resource_name) + o[:resource_version] = version if version + @client.public_send("watch_#{resource_name}", o).tap do |watcher| + create_watcher_thread resource_name, watcher + end end end def create_pull_thread(conf) options = conf.to_h.dup options[:as] = :raw resource_name = options.delete :resource_name pull_interval = options.delete :interval thread_create :"pull_#{resource_name}" do - tag = generate_tag resource_name - while thread_current_running? - log.debug "Going to pull #{resource_name}" - response = @client.public_send "get_#{resource_name}", options - now = Fluent::Engine.now - es = Fluent::MultiEventStream.new + tag = generate_tag resource_name + while thread_current_running? + log.debug "Going to pull #{resource_name}" + response = @client.public_send "get_#{resource_name}", options + now = Fluent::Engine.now + es = Fluent::MultiEventStream.new - # code copied from kubeclient - # kubeclient will create one open struct object for each item in the response, - # but this is totally unecessary in this plugin, thus we use as: :raw. - result = JSON.parse(response) + # code copied from kubeclient + # kubeclient will create one open struct object for each item in the response, + # but this is totally unecessary in this plugin, thus we use as: :raw. + result = JSON.parse(response) - resource_version = result.fetch('resourceVersion') { - result.fetch('metadata', {})['resourceVersion'] - } + resource_version = result.fetch('resourceVersion') do + result.fetch('metadata', {})['resourceVersion'] + end - update_op = if resource_version - ->(item) { item['metadata'].update requestResourceVersion: resource_version } - else - ->(item) {} - end + update_op = if resource_version + ->(item) { item['metadata'].update requestResourceVersion: resource_version } + else + ->(item) {} + end - # result['items'] might be nil due to https://github.com/kubernetes/kubernetes/issues/13096 - items = result['items'].to_a - log.debug { "Received #{items.size} #{resource_name}" } - items.each { |item| es.add now, item.tap(&update_op) } - router.emit_stream(tag, es) + # result['items'] might be nil due to https://github.com/kubernetes/kubernetes/issues/13096 + items = result['items'].to_a + log.debug { "Received #{items.size} #{resource_name}" } + items.each { |item| es.add now, item.tap(&update_op) } + router.emit_stream(tag, es) - sleep(pull_interval) - end + sleep(pull_interval) + end end end def create_watcher_thread(object_name, watcher) - thread_create(:"watch_#{object_name}") { - tag = generate_tag "#{object_name}.watch" - watcher.each { |entity| - log.trace { "Received new object from watching #{object_name}"} - entity = JSON.parse(entity) - router.emit tag, Fluent::Engine.now, entity - @storage.put object_name, entity['object']['metadata']['resourceVersion'] - } - } + thread_create(:"watch_#{object_name}") do + tag = generate_tag "#{object_name}.watch" + watcher.each do |entity| + log.trace { "Received new object from watching #{object_name}" } + entity = JSON.parse(entity) + router.emit tag, Fluent::Engine.now, entity + @storage.put object_name, entity['object']['metadata']['resourceVersion'] + end + end end end end