lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.17.0 vs lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.17.1
- old
+ new
@@ -40,10 +40,12 @@
:desc => 'Set true to remove partition from data'
config_param :exclude_message_key, :bool, :default => false,
:desc => 'Set true to remove message key from data'
config_param :exclude_topic_key, :bool, :default => false,
:desc => 'Set true to remove topic name key from data'
+ config_param :exclude_fields, :array, :default => [], value_type: :string,
+ :desc => 'Fields to remove from data where the value is a jsonpath to a record value'
config_param :use_event_time, :bool, :default => false, :desc => 'Use fluentd event time for kafka create_time'
config_param :headers, :hash, default: {}, symbolize_keys: true, value_type: :string,
:desc => 'Kafka message headers'
config_param :headers_from_record, :hash, default: {}, symbolize_keys: true, value_type: :string,
:desc => 'Kafka message headers where the header value is a jsonpath to a record value'
@@ -175,10 +177,14 @@
@headers_from_record_accessors = {}
@headers_from_record.each do |key, value|
@headers_from_record_accessors[key] = record_accessor_create(value)
end
+
+ @exclude_field_accessors = @exclude_fields.map do |field|
+ record_accessor_create(field)
+ end
end
def multi_workers_ready?
true
end
@@ -233,11 +239,11 @@
base_headers = @headers
mutate_headers = !@headers_from_record_accessors.empty?
begin
- producer = @kafka.topic_producer(topic, @producer_opts)
+ producer = @kafka.topic_producer(topic, **@producer_opts)
chunk.msgpack_each { |time, record|
begin
record = inject_values_to_record(tag, time, record)
record.delete(@topic_key) if @exclude_topic_key
partition_key = (@exclude_partition_key ? record.delete(@partition_key_key) : record[@partition_key_key]) || @default_partition_key
@@ -249,9 +255,15 @@
@headers_from_record_accessors.each do |key, header_accessor|
headers[key] = header_accessor.call(record)
end
else
headers = base_headers
+ end
+
+ unless @exclude_fields.empty?
+ @exclude_field_accessors.each do |exclude_field_accessor|
+ exclude_field_accessor.delete(record)
+ end
end
record_buf = @formatter_proc.call(tag, time, record)
record_buf_bytes = record_buf.bytesize
if @max_send_limit_bytes && record_buf_bytes > @max_send_limit_bytes