lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.8.1 vs lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.8.2
- old
+ new
@@ -23,10 +23,11 @@
config_param :partition_key_key, :string, :default => 'partition_key', :desc => "Field for kafka partition key"
config_param :default_partition_key, :string, :default => nil
config_param :partition_key, :string, :default => 'partition', :desc => "Field for kafka partition"
config_param :default_partition, :integer, :default => nil
config_param :client_id, :string, :default => 'fluentd'
+ config_param :idempotent, :bool, :default => false, :desc => 'Enable idempotent producer'
config_param :sasl_over_ssl, :bool, :default => true,
:desc => <<-DESC
Set to false to prevent SSL strict mode when using SASL authentication
DESC
config_param :exclude_partition_key, :bool, :default => false,
@@ -131,10 +132,10 @@
if @chunk_key_tag
log.warn "default_topic is set. Fluentd's event tag is not used for topic"
end
end
- @producer_opts = {max_retries: @max_send_retries, required_acks: @required_acks}
+ @producer_opts = {max_retries: @max_send_retries, required_acks: @required_acks, idempotent: @idempotent}
@producer_opts[:ack_timeout] = @ack_timeout if @ack_timeout
@producer_opts[:compression_codec] = @compression_codec.to_sym if @compression_codec
if @active_support_notification_regex
require 'active_support/notifications'
require 'active_support/core_ext/hash/keys'