lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.14.1 vs lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.14.2

- old
+ new

@@ -67,10 +67,11 @@ :desc => <<-DESC The codec the producer uses to compress messages. Supported codecs depends on ruby-kafka: https://github.com/zendesk/ruby-kafka#compression DESC config_param :max_send_limit_bytes, :size, :default => nil + config_param :discard_kafka_delivery_failed, :bool, :default => false config_param :active_support_notification_regex, :string, :default => nil, :desc => <<-DESC Add a regular expression to capture ActiveSupport notifications from the Kafka client requires activesupport gem - records will be generated under fluent_kafka_stats.** DESC @@ -265,10 +266,19 @@ create_time: @use_event_time ? Time.at(time) : Time.now) } if messages > 0 log.debug { "#{messages} messages send." } - producer.deliver_messages + if @discard_kafka_delivery_failed + begin + producer.deliver_messages + rescue Kafka::DeliveryFailed => e + log.warn "DeliveryFailed occurred. Discard broken event:", :error => e.to_s, :error_class => e.class.to_s, :tag => tag + producer.clear_buffer + end + else + producer.deliver_messages + end end rescue Kafka::UnknownTopicOrPartition if @use_default_for_unknown_topic && topic != @default_topic producer.shutdown if producer log.warn "'#{topic}' topic not found. Retry with '#{default_topic}' topic"