lib/fluent/plugin/in_kubernetes_objects.rb in fluent-plugin-kubernetes-objects-1.1.9 vs lib/fluent/plugin/in_kubernetes_objects.rb in fluent-plugin-kubernetes-objects-1.1.10

- old
+ new

@@ -198,11 +198,16 @@ thread_create :"pull_#{resource_name}" do tag = generate_tag resource_name while thread_current_running? log.debug "Going to pull #{resource_name}" - response = @client.public_send "get_#{resource_name}", options + begin + response = @client.public_send "get_#{resource_name}", options + rescue Kubeclient::ResourceNotFoundError, NoMethodError + log.error "resource '#{resource_name}' not found. Stopped pulling it" + break + end now = Fluent::Engine.now es = Fluent::MultiEventStream.new # code copied from kubeclient # kubeclient will create one open struct object for each item in the response, @@ -241,25 +246,30 @@ options[:resource_version] = 0 end thread_create :"watch_#{resource_name}" do while thread_current_running? - @client.public_send("watch_#{resource_name}", options).tap do |watcher| - tag = generate_tag "#{resource_name}" - begin - watcher.each do |entity| - begin - entity = JSON.parse(entity) - router.emit tag, Fluent::Engine.now, entity - options[:resource_version] = entity['object']['metadata']['resourceVersion'] - @storage.put resource_name, entity['object']['metadata']['resourceVersion'] - rescue => e - log.info "Got exception #{e} parsing entity #{entity}. Resetting watcher." + begin + @client.public_send("watch_#{resource_name}", options).tap do |watcher| + tag = generate_tag "#{resource_name}" + begin + watcher.each do |entity| + begin + entity = JSON.parse(entity) + router.emit tag, Fluent::Engine.now, entity + options[:resource_version] = entity['object']['metadata']['resourceVersion'] + @storage.put resource_name, entity['object']['metadata']['resourceVersion'] + rescue => e + log.info "Got exception #{e} parsing entity #{entity}. Resetting watcher." + end end + rescue => e + log.info "Got exception #{e}. Resetting watcher." end - rescue => e - log.info "Got exception #{e}. Resetting watcher." end + rescue Kubeclient::ResourceNotFoundError, NoMethodError + log.error "resource '#{resource_name}' not found. Stopped watching it" + break end end end end end