lib/fluent/plugin/in_kubernetes_objects.rb in fluent-plugin-kubernetes-objects-1.1.1 vs lib/fluent/plugin/in_kubernetes_objects.rb in fluent-plugin-kubernetes-objects-1.1.2

- old
+ new

@@ -3,11 +3,11 @@ require 'fluent/plugin/input' require 'kubeclient' module Fluent::Plugin class KubernetesObjectsInput < Fluent::Plugin::Input - VERSION = '1.1.1'.freeze + VERSION = '1.1.2'.freeze Fluent::Plugin.register_input('kubernetes_objects', self) helpers :storage, :thread @@ -179,13 +179,11 @@ resource_name = o.delete(:resource_name) watch_interval = o.delete(:interval) version = @storage.get(resource_name) o[:resource_version] = version if version - @client.public_send("watch_#{resource_name}", o).tap do |watcher| - create_watcher_thread resource_name, watcher, watch_interval - end + create_watcher_thread resource_name, o, watch_interval end end def create_pull_thread(conf) options = conf.to_h.dup @@ -227,19 +225,19 @@ end end def create_watcher_thread(object_name, watcher, interval) thread_create(:"watch_#{object_name}") do - tag = generate_tag "#{object_name}.watch" - while thread_current_running? + @client.public_send("watch_#{object_name}", watcher).tap { |watcher| + tag = generate_tag "#{object_name}.watch" watcher.each do |entity| log.trace { "Received new object from watching #{object_name}" } entity = JSON.parse(entity) router.emit tag, Fluent::Engine.now, entity @storage.put object_name, entity['object']['metadata']['resourceVersion'] sleep(interval) end - end + } end end end end