lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.17.0 vs lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.17.1

- old
+ new

@@ -40,10 +40,12 @@ :desc => 'Set true to remove partition from data' config_param :exclude_message_key, :bool, :default => false, :desc => 'Set true to remove message key from data' config_param :exclude_topic_key, :bool, :default => false, :desc => 'Set true to remove topic name key from data' + config_param :exclude_fields, :array, :default => [], value_type: :string, + :desc => 'Fields to remove from data where the value is a jsonpath to a record value' config_param :use_event_time, :bool, :default => false, :desc => 'Use fluentd event time for kafka create_time' config_param :headers, :hash, default: {}, symbolize_keys: true, value_type: :string, :desc => 'Kafka message headers' config_param :headers_from_record, :hash, default: {}, symbolize_keys: true, value_type: :string, :desc => 'Kafka message headers where the header value is a jsonpath to a record value' @@ -175,10 +177,14 @@ @headers_from_record_accessors = {} @headers_from_record.each do |key, value| @headers_from_record_accessors[key] = record_accessor_create(value) end + + @exclude_field_accessors = @exclude_fields.map do |field| + record_accessor_create(field) + end end def multi_workers_ready? true end @@ -233,11 +239,11 @@ base_headers = @headers mutate_headers = !@headers_from_record_accessors.empty? begin - producer = @kafka.topic_producer(topic, @producer_opts) + producer = @kafka.topic_producer(topic, **@producer_opts) chunk.msgpack_each { |time, record| begin record = inject_values_to_record(tag, time, record) record.delete(@topic_key) if @exclude_topic_key partition_key = (@exclude_partition_key ? record.delete(@partition_key_key) : record[@partition_key_key]) || @default_partition_key @@ -249,9 +255,15 @@ @headers_from_record_accessors.each do |key, header_accessor| headers[key] = header_accessor.call(record) end else headers = base_headers + end + + unless @exclude_fields.empty? + @exclude_field_accessors.each do |exclude_field_accessor| + exclude_field_accessor.delete(record) + end end record_buf = @formatter_proc.call(tag, time, record) record_buf_bytes = record_buf.bytesize if @max_send_limit_bytes && record_buf_bytes > @max_send_limit_bytes