lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.14.1 vs lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.14.2
- old
+ new
@@ -67,10 +67,11 @@
:desc => <<-DESC
The codec the producer uses to compress messages.
Supported codecs depends on ruby-kafka: https://github.com/zendesk/ruby-kafka#compression
DESC
config_param :max_send_limit_bytes, :size, :default => nil
+ config_param :discard_kafka_delivery_failed, :bool, :default => false
config_param :active_support_notification_regex, :string, :default => nil,
:desc => <<-DESC
Add a regular expression to capture ActiveSupport notifications from the Kafka client
requires activesupport gem - records will be generated under fluent_kafka_stats.**
DESC
@@ -265,10 +266,19 @@
create_time: @use_event_time ? Time.at(time) : Time.now)
}
if messages > 0
log.debug { "#{messages} messages send." }
- producer.deliver_messages
+ if @discard_kafka_delivery_failed
+ begin
+ producer.deliver_messages
+ rescue Kafka::DeliveryFailed => e
+ log.warn "DeliveryFailed occurred. Discard broken event:", :error => e.to_s, :error_class => e.class.to_s, :tag => tag
+ producer.clear_buffer
+ end
+ else
+ producer.deliver_messages
+ end
end
rescue Kafka::UnknownTopicOrPartition
if @use_default_for_unknown_topic && topic != @default_topic
producer.shutdown if producer
log.warn "'#{topic}' topic not found. Retry with '#{default_topic}' topic"