lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.13.0 vs lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.13.1
- old
+ new
@@ -13,10 +13,11 @@
config_param :brokers, :array, :value_type => :string, :default => ['localhost:9092'],
:desc => <<-DESC
Set brokers directly:
<broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
DESC
+ config_param :topic, :string, :default => nil, :desc => "kafka topic. Placeholders are supported"
config_param :topic_key, :string, :default => 'topic', :desc => "Field for kafka topic"
config_param :default_topic, :string, :default => nil,
:desc => "Default output topic when record doesn't have topic field"
config_param :message_key_key, :string, :default => 'message_key', :desc => "Field for kafka message key"
config_param :default_message_key, :string, :default => nil
@@ -213,10 +214,14 @@
end
# TODO: optimize write performance
def write(chunk)
tag = chunk.metadata.tag
- topic = (chunk.metadata.variables && chunk.metadata.variables[@topic_key_sym]) || @default_topic || tag
+ topic = if @topic
+ extract_placeholders(@topic, chunk)
+ else
+ (chunk.metadata.variables && chunk.metadata.variables[@topic_key_sym]) || @default_topic || tag
+ end
messages = 0
record_buf = nil
base_headers = @headers