lib/fluent/plugin/in_kubernetes_objects.rb in fluent-plugin-kubernetes-objects-1.0.0 vs lib/fluent/plugin/in_kubernetes_objects.rb in fluent-plugin-kubernetes-objects-1.1.0
- old
+ new
@@ -1,13 +1,13 @@
# frozen_string_literal: true
-require "fluent/plugin/input"
+require 'fluent/plugin/input'
require 'kubeclient'
module Fluent::Plugin
class KubernetesObjectsInput < Fluent::Plugin::Input
- VERSION = "1.0.0"
+ VERSION = '1.1.0'.freeze
Fluent::Plugin.register_input('kubernetes_objects', self)
helpers :storage, :thread
@@ -74,21 +74,21 @@
config_param :field_selector, :string, default: nil
end
config_section :storage do
# use memory by default
- config_set_default :usage, "checkpoints"
- config_set_default :@type, "local"
+ config_set_default :usage, 'checkpoints'
+ config_set_default :@type, 'local'
config_set_default :persistent, false
end
def configure(conf)
super
- raise Fluent::ConfigError, "At least one <pull> or <watch> is required, but found none." if @pull_objects.empty? && @watch_objects.empty?
+ raise Fluent::ConfigError, 'At least one <pull> or <watch> is required, but found none.' if @pull_objects.empty? && @watch_objects.empty?
- @storage = storage_create usage: "checkpoints"
+ @storage = storage_create usage: 'checkpoints'
parse_tag
initialize_client
end
@@ -116,122 +116,122 @@
end
def initialize_client
# mostly borrowed from Fluentd Kubernetes Metadata Filter Plugin
if @kubernetes_url.nil?
- # Use Kubernetes default service account if we're in a pod.
- env_host = ENV['KUBERNETES_SERVICE_HOST']
- env_port = ENV['KUBERNETES_SERVICE_PORT']
- if env_host && env_port
- @kubernetes_url = "https://#{env_host}:#{env_port}/#{@api_version == 'v1' ? 'api' : 'apis'}"
- end
+ # Use Kubernetes default service account if we're in a pod.
+ env_host = ENV['KUBERNETES_SERVICE_HOST']
+ env_port = ENV['KUBERNETES_SERVICE_PORT']
+ if env_host && env_port
+ @kubernetes_url = "https://#{env_host}:#{env_port}/#{@api_version == 'v1' ? 'api' : 'apis'}"
+ end
end
- raise Fluent::ConfigError, "kubernetes url is not set" unless @kubernetes_url
+ raise Fluent::ConfigError, 'kubernetes url is not set' unless @kubernetes_url
# Use SSL certificate and bearer token from Kubernetes service account.
if Dir.exist?(@secret_dir)
- secret_ca_file = File.join(@secret_dir, 'ca.cert')
- secret_token_file = File.join(@secret_dir, 'token')
+ secret_ca_file = File.join(@secret_dir, 'ca.crt')
+ secret_token_file = File.join(@secret_dir, 'token')
- if @ca_file.nil? and File.exist?(secret_ca_file)
- @ca_file = secret_ca_file
- end
+ if @ca_file.nil? && File.exist?(secret_ca_file)
+ @ca_file = secret_ca_file
+ end
- if @bearer_token_file.nil? and File.exist?(secret_token_file)
- @bearer_token_file = secret_token_file
- end
- end
+ if @bearer_token_file.nil? && File.exist?(secret_token_file)
+ @bearer_token_file = secret_token_file
+ end
+ end
ssl_options = {
- client_cert: @client_cert && OpenSSL::X509::Certificate.new(File.read(@client_cert)),
- client_key: @client_key && OpenSSL::PKey::RSA.new(File.read(@client_key)),
- ca_file: @ca_file,
- verify_ssl: @insecure_ssl ? OpenSSL::SSL::VERIFY_NONE : OpenSSL::SSL::VERIFY_PEER
+ client_cert: @client_cert && OpenSSL::X509::Certificate.new(File.read(@client_cert)),
+ client_key: @client_key && OpenSSL::PKey::RSA.new(File.read(@client_key)),
+ ca_file: @ca_file,
+ verify_ssl: @insecure_ssl ? OpenSSL::SSL::VERIFY_NONE : OpenSSL::SSL::VERIFY_PEER
}
auth_options = {}
auth_options[:bearer_token] = File.read(@bearer_token_file) if @bearer_token_file
@client = Kubeclient::Client.new(
- @kubernetes_url, @api_version,
- ssl_options: ssl_options,
- auth_options: auth_options
+ @kubernetes_url, @api_version,
+ ssl_options: ssl_options,
+ auth_options: auth_options
)
begin
- @client.api_valid?
+ @client.api_valid?
rescue KubeException => kube_error
- raise Fluent::ConfigError, "Invalid Kubernetes API #{@api_version} endpoint #{@kubernetes_url}: #{kube_error.message}"
- end
+ raise Fluent::ConfigError, "Invalid Kubernetes API #{@api_version} endpoint #{@kubernetes_url}: #{kube_error.message}"
+ end
end
def start_pullers
@pull_objects.each(&method(:create_pull_thread))
end
def start_watchers
@watchers = @watch_objects.map do |o|
- o = o.to_h.dup
- o[:as] = :raw
- resource_name = o.delete(:resource_name)
- version = @storage.get(resource_name)
- o[:resource_version] = version if version
- @client.public_send("watch_#{resource_name}", o).tap { |watcher|
- create_watcher_thread resource_name, watcher
- }
+ o = o.to_h.dup
+ o[:as] = :raw
+ resource_name = o.delete(:resource_name)
+ version = @storage.get(resource_name)
+ o[:resource_version] = version if version
+ @client.public_send("watch_#{resource_name}", o).tap do |watcher|
+ create_watcher_thread resource_name, watcher
+ end
end
end
def create_pull_thread(conf)
options = conf.to_h.dup
options[:as] = :raw
resource_name = options.delete :resource_name
pull_interval = options.delete :interval
thread_create :"pull_#{resource_name}" do
- tag = generate_tag resource_name
- while thread_current_running?
- log.debug "Going to pull #{resource_name}"
- response = @client.public_send "get_#{resource_name}", options
- now = Fluent::Engine.now
- es = Fluent::MultiEventStream.new
+ tag = generate_tag resource_name
+ while thread_current_running?
+ log.debug "Going to pull #{resource_name}"
+ response = @client.public_send "get_#{resource_name}", options
+ now = Fluent::Engine.now
+ es = Fluent::MultiEventStream.new
- # code copied from kubeclient
- # kubeclient will create one open struct object for each item in the response,
- # but this is totally unecessary in this plugin, thus we use as: :raw.
- result = JSON.parse(response)
+ # code copied from kubeclient
+ # kubeclient will create one open struct object for each item in the response,
+ # but this is totally unecessary in this plugin, thus we use as: :raw.
+ result = JSON.parse(response)
- resource_version = result.fetch('resourceVersion') {
- result.fetch('metadata', {})['resourceVersion']
- }
+ resource_version = result.fetch('resourceVersion') do
+ result.fetch('metadata', {})['resourceVersion']
+ end
- update_op = if resource_version
- ->(item) { item['metadata'].update requestResourceVersion: resource_version }
- else
- ->(item) {}
- end
+ update_op = if resource_version
+ ->(item) { item['metadata'].update requestResourceVersion: resource_version }
+ else
+ ->(item) {}
+ end
- # result['items'] might be nil due to https://github.com/kubernetes/kubernetes/issues/13096
- items = result['items'].to_a
- log.debug { "Received #{items.size} #{resource_name}" }
- items.each { |item| es.add now, item.tap(&update_op) }
- router.emit_stream(tag, es)
+ # result['items'] might be nil due to https://github.com/kubernetes/kubernetes/issues/13096
+ items = result['items'].to_a
+ log.debug { "Received #{items.size} #{resource_name}" }
+ items.each { |item| es.add now, item.tap(&update_op) }
+ router.emit_stream(tag, es)
- sleep(pull_interval)
- end
+ sleep(pull_interval)
+ end
end
end
def create_watcher_thread(object_name, watcher)
- thread_create(:"watch_#{object_name}") {
- tag = generate_tag "#{object_name}.watch"
- watcher.each { |entity|
- log.trace { "Received new object from watching #{object_name}"}
- entity = JSON.parse(entity)
- router.emit tag, Fluent::Engine.now, entity
- @storage.put object_name, entity['object']['metadata']['resourceVersion']
- }
- }
+ thread_create(:"watch_#{object_name}") do
+ tag = generate_tag "#{object_name}.watch"
+ watcher.each do |entity|
+ log.trace { "Received new object from watching #{object_name}" }
+ entity = JSON.parse(entity)
+ router.emit tag, Fluent::Engine.now, entity
+ @storage.put object_name, entity['object']['metadata']['resourceVersion']
+ end
+ end
end
end
end