lib/fluent/plugin/in_kubernetes_objects.rb in fluent-plugin-kubernetes-objects-1.1.1 vs lib/fluent/plugin/in_kubernetes_objects.rb in fluent-plugin-kubernetes-objects-1.1.2
- old
+ new
@@ -3,11 +3,11 @@
require 'fluent/plugin/input'
require 'kubeclient'
module Fluent::Plugin
class KubernetesObjectsInput < Fluent::Plugin::Input
- VERSION = '1.1.1'.freeze
+ VERSION = '1.1.2'.freeze
Fluent::Plugin.register_input('kubernetes_objects', self)
helpers :storage, :thread
@@ -179,13 +179,11 @@
resource_name = o.delete(:resource_name)
watch_interval = o.delete(:interval)
version = @storage.get(resource_name)
o[:resource_version] = version if version
- @client.public_send("watch_#{resource_name}", o).tap do |watcher|
- create_watcher_thread resource_name, watcher, watch_interval
- end
+ create_watcher_thread resource_name, o, watch_interval
end
end
def create_pull_thread(conf)
options = conf.to_h.dup
@@ -227,19 +225,19 @@
end
end
def create_watcher_thread(object_name, watcher, interval)
thread_create(:"watch_#{object_name}") do
- tag = generate_tag "#{object_name}.watch"
- while thread_current_running?
+ @client.public_send("watch_#{object_name}", watcher).tap { |watcher|
+ tag = generate_tag "#{object_name}.watch"
watcher.each do |entity|
log.trace { "Received new object from watching #{object_name}" }
entity = JSON.parse(entity)
router.emit tag, Fluent::Engine.now, entity
@storage.put object_name, entity['object']['metadata']['resourceVersion']
sleep(interval)
end
- end
+ }
end
end
end
end