lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.7.5 vs lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.7.6
- old
+ new
@@ -34,10 +34,13 @@
config_param :exclude_topic_key, :bool, :default => false,
:desc => 'Set true to remove topic name key from data'
config_param :get_kafka_client_log, :bool, :default => false
+ config_param :ignore_exceptions, :array, :default => [], value_type: :string, :desc => "Ignorable exception list"
+ config_param :exception_backup, :bool, :default => true, :desc => "Chunk backup flag when ignore exception occured"
+
# ruby-kafka producer options
config_param :max_send_retries, :integer, :default => 2,
:desc => "Number of times to retry sending of messages to a leader."
config_param :required_acks, :integer, :default => -1,
:desc => "The number of acks required per request."
@@ -216,15 +219,20 @@
log.debug { "#{messages} messages send." }
producer.deliver_messages
end
end
rescue Exception => e
+ ignore = @ignore_exceptions.include?(e.class.name)
+
log.warn "Send exception occurred: #{e}"
log.warn "Exception Backtrace : #{e.backtrace.join("\n")}"
+ log.warn "Exception ignored in tag : #{tag}" if ignore
# For safety, refresh client and its producers
refresh_client(false)
+ # raise UnrecoverableError for backup ignored exception chunk
+ raise Fluent::UnrecoverableError if ignore && exception_backup
# Raise exception to retry sendind messages
- raise e
+ raise e unless ignore
ensure
producer.shutdown if producer
end
end
end