lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.7.5 vs lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.7.6

- old
+ new

@@ -34,10 +34,13 @@ config_param :exclude_topic_key, :bool, :default => false, :desc => 'Set true to remove topic name key from data' config_param :get_kafka_client_log, :bool, :default => false + config_param :ignore_exceptions, :array, :default => [], value_type: :string, :desc => "Ignorable exception list" + config_param :exception_backup, :bool, :default => true, :desc => "Chunk backup flag when ignore exception occured" + # ruby-kafka producer options config_param :max_send_retries, :integer, :default => 2, :desc => "Number of times to retry sending of messages to a leader." config_param :required_acks, :integer, :default => -1, :desc => "The number of acks required per request." @@ -216,15 +219,20 @@ log.debug { "#{messages} messages send." } producer.deliver_messages end end rescue Exception => e + ignore = @ignore_exceptions.include?(e.class.name) + log.warn "Send exception occurred: #{e}" log.warn "Exception Backtrace : #{e.backtrace.join("\n")}" + log.warn "Exception ignored in tag : #{tag}" if ignore # For safety, refresh client and its producers refresh_client(false) + # raise UnrecoverableError for backup ignored exception chunk + raise Fluent::UnrecoverableError if ignore && exception_backup # Raise exception to retry sendind messages - raise e + raise e unless ignore ensure producer.shutdown if producer end end end