lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.8.1 vs lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.8.2

- old
+ new

@@ -23,10 +23,11 @@ config_param :partition_key_key, :string, :default => 'partition_key', :desc => "Field for kafka partition key" config_param :default_partition_key, :string, :default => nil config_param :partition_key, :string, :default => 'partition', :desc => "Field for kafka partition" config_param :default_partition, :integer, :default => nil config_param :client_id, :string, :default => 'fluentd' + config_param :idempotent, :bool, :default => false, :desc => 'Enable idempotent producer' config_param :sasl_over_ssl, :bool, :default => true, :desc => <<-DESC Set to false to prevent SSL strict mode when using SASL authentication DESC config_param :exclude_partition_key, :bool, :default => false, @@ -131,10 +132,10 @@ if @chunk_key_tag log.warn "default_topic is set. Fluentd's event tag is not used for topic" end end - @producer_opts = {max_retries: @max_send_retries, required_acks: @required_acks} + @producer_opts = {max_retries: @max_send_retries, required_acks: @required_acks, idempotent: @idempotent} @producer_opts[:ack_timeout] = @ack_timeout if @ack_timeout @producer_opts[:compression_codec] = @compression_codec.to_sym if @compression_codec if @active_support_notification_regex require 'active_support/notifications' require 'active_support/core_ext/hash/keys'