lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.13.0 vs lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.13.1

- old
+ new

@@ -13,10 +13,11 @@ config_param :brokers, :array, :value_type => :string, :default => ['localhost:9092'], :desc => <<-DESC Set brokers directly: <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. DESC + config_param :topic, :string, :default => nil, :desc => "kafka topic. Placeholders are supported" config_param :topic_key, :string, :default => 'topic', :desc => "Field for kafka topic" config_param :default_topic, :string, :default => nil, :desc => "Default output topic when record doesn't have topic field" config_param :message_key_key, :string, :default => 'message_key', :desc => "Field for kafka message key" config_param :default_message_key, :string, :default => nil @@ -213,10 +214,14 @@ end # TODO: optimize write performance def write(chunk) tag = chunk.metadata.tag - topic = (chunk.metadata.variables && chunk.metadata.variables[@topic_key_sym]) || @default_topic || tag + topic = if @topic + extract_placeholders(@topic, chunk) + else + (chunk.metadata.variables && chunk.metadata.variables[@topic_key_sym]) || @default_topic || tag + end messages = 0 record_buf = nil base_headers = @headers