lib/fluent/plugin/in_kubernetes_objects.rb in fluent-plugin-kubernetes-objects-1.1.9 vs lib/fluent/plugin/in_kubernetes_objects.rb in fluent-plugin-kubernetes-objects-1.1.10
- old
+ new
@@ -198,11 +198,16 @@
thread_create :"pull_#{resource_name}" do
tag = generate_tag resource_name
while thread_current_running?
log.debug "Going to pull #{resource_name}"
- response = @client.public_send "get_#{resource_name}", options
+ begin
+ response = @client.public_send "get_#{resource_name}", options
+ rescue Kubeclient::ResourceNotFoundError, NoMethodError
+ log.error "resource '#{resource_name}' not found. Stopped pulling it"
+ break
+ end
now = Fluent::Engine.now
es = Fluent::MultiEventStream.new
# code copied from kubeclient
# kubeclient will create one open struct object for each item in the response,
@@ -241,25 +246,30 @@
options[:resource_version] = 0
end
thread_create :"watch_#{resource_name}" do
while thread_current_running?
- @client.public_send("watch_#{resource_name}", options).tap do |watcher|
- tag = generate_tag "#{resource_name}"
- begin
- watcher.each do |entity|
- begin
- entity = JSON.parse(entity)
- router.emit tag, Fluent::Engine.now, entity
- options[:resource_version] = entity['object']['metadata']['resourceVersion']
- @storage.put resource_name, entity['object']['metadata']['resourceVersion']
- rescue => e
- log.info "Got exception #{e} parsing entity #{entity}. Resetting watcher."
+ begin
+ @client.public_send("watch_#{resource_name}", options).tap do |watcher|
+ tag = generate_tag "#{resource_name}"
+ begin
+ watcher.each do |entity|
+ begin
+ entity = JSON.parse(entity)
+ router.emit tag, Fluent::Engine.now, entity
+ options[:resource_version] = entity['object']['metadata']['resourceVersion']
+ @storage.put resource_name, entity['object']['metadata']['resourceVersion']
+ rescue => e
+ log.info "Got exception #{e} parsing entity #{entity}. Resetting watcher."
+ end
end
+ rescue => e
+ log.info "Got exception #{e}. Resetting watcher."
end
- rescue => e
- log.info "Got exception #{e}. Resetting watcher."
end
+ rescue Kubeclient::ResourceNotFoundError, NoMethodError
+ log.error "resource '#{resource_name}' not found. Stopped watching it"
+ break
end
end
end
end
end