lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.6.5 vs lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.6.6

- old
+ new

@@ -13,14 +13,18 @@ config_param :brokers, :array, :value_type => :string, :default => ['localhost:9092'], :desc => <<-DESC Set brokers directly: <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. DESC + config_param :topic_key, :string, :default => 'topic', :desc => "Field for kafka topic" config_param :default_topic, :string, :default => nil, :desc => "Default output topic when record doesn't have topic field" + config_param :message_key_key, :string, :default => 'message_key', :desc => "Field for kafka message key" config_param :default_message_key, :string, :default => nil + config_param :partition_key_key, :string, :default => 'partition_key', :desc => "Field for kafka partition key" config_param :default_partition_key, :string, :default => nil + config_param :partition_key, :string, :default => 'partition', :desc => "Field for kafka partition" config_param :default_partition, :integer, :default => nil config_param :client_id, :string, :default => 'fluentd' config_param :exclude_partition_key, :bool, :default => false, :desc => 'Set true to remove partition key from data' config_param :exclude_partition, :bool, :default => false, @@ -121,10 +125,12 @@ event = ActiveSupport::Notifications::Event.new(*args) message = event.payload.respond_to?(:stringify_keys) ? event.payload.stringify_keys : event.payload @router.emit("fluent_kafka_stats.#{event.name}", Time.now.to_i, message) end end + + @topic_key_sym = @topic_key.to_sym end def multi_workers_ready? true end @@ -166,23 +172,23 @@ end # TODO: optimize write performance def write(chunk) tag = chunk.metadata.tag - topic = chunk.metadata.variables[:topic] || @default_topic || tag + topic = chunk.metadata.variables[@topic_key_sym] || @default_topic || tag producer = @kafka.topic_producer(topic, @producer_opts) messages = 0 record_buf = nil begin chunk.msgpack_each { |time, record| begin record = inject_values_to_record(tag, time, record) - record.delete('topic'.freeze) if @exclude_topic_key - partition_key = (@exclude_partition_key ? record.delete('partition_key'.freeze) : record['partition_key'.freeze]) || @default_partition_key - partition = (@exclude_partition ? record.delete('partition'.freeze) : record['partition'.freeze]) || @default_partition - message_key = (@exclude_message_key ? record.delete('message_key'.freeze) : record['message_key'.freeze]) || @default_message_key + record.delete(@topic_key) if @exclude_topic_key + partition_key = (@exclude_partition_key ? record.delete(@partition_key_key) : record[@partition_key_key]) || @default_partition_key + partition = (@exclude_partition ? record.delete(@partition_key) : record[@partition_key]) || @default_partition + message_key = (@exclude_message_key ? record.delete(@message_key) : record[@message_key]) || @default_message_key record_buf = @formatter_proc.call(tag, time, record) rescue StandardError => e log.warn "unexpected error during format record. Skip broken event:", :error => e.to_s, :error_class => e.class.to_s, :time => time, :record => record next