lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.6.5 vs lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.6.6
- old
+ new
@@ -13,14 +13,18 @@
config_param :brokers, :array, :value_type => :string, :default => ['localhost:9092'],
:desc => <<-DESC
Set brokers directly:
<broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
DESC
+ config_param :topic_key, :string, :default => 'topic', :desc => "Field for kafka topic"
config_param :default_topic, :string, :default => nil,
:desc => "Default output topic when record doesn't have topic field"
+ config_param :message_key_key, :string, :default => 'message_key', :desc => "Field for kafka message key"
config_param :default_message_key, :string, :default => nil
+ config_param :partition_key_key, :string, :default => 'partition_key', :desc => "Field for kafka partition key"
config_param :default_partition_key, :string, :default => nil
+ config_param :partition_key, :string, :default => 'partition', :desc => "Field for kafka partition"
config_param :default_partition, :integer, :default => nil
config_param :client_id, :string, :default => 'fluentd'
config_param :exclude_partition_key, :bool, :default => false,
:desc => 'Set true to remove partition key from data'
config_param :exclude_partition, :bool, :default => false,
@@ -121,10 +125,12 @@
event = ActiveSupport::Notifications::Event.new(*args)
message = event.payload.respond_to?(:stringify_keys) ? event.payload.stringify_keys : event.payload
@router.emit("fluent_kafka_stats.#{event.name}", Time.now.to_i, message)
end
end
+
+ @topic_key_sym = @topic_key.to_sym
end
def multi_workers_ready?
true
end
@@ -166,23 +172,23 @@
end
# TODO: optimize write performance
def write(chunk)
tag = chunk.metadata.tag
- topic = chunk.metadata.variables[:topic] || @default_topic || tag
+ topic = chunk.metadata.variables[@topic_key_sym] || @default_topic || tag
producer = @kafka.topic_producer(topic, @producer_opts)
messages = 0
record_buf = nil
begin
chunk.msgpack_each { |time, record|
begin
record = inject_values_to_record(tag, time, record)
- record.delete('topic'.freeze) if @exclude_topic_key
- partition_key = (@exclude_partition_key ? record.delete('partition_key'.freeze) : record['partition_key'.freeze]) || @default_partition_key
- partition = (@exclude_partition ? record.delete('partition'.freeze) : record['partition'.freeze]) || @default_partition
- message_key = (@exclude_message_key ? record.delete('message_key'.freeze) : record['message_key'.freeze]) || @default_message_key
+ record.delete(@topic_key) if @exclude_topic_key
+ partition_key = (@exclude_partition_key ? record.delete(@partition_key_key) : record[@partition_key_key]) || @default_partition_key
+ partition = (@exclude_partition ? record.delete(@partition_key) : record[@partition_key]) || @default_partition
+ message_key = (@exclude_message_key ? record.delete(@message_key) : record[@message_key]) || @default_message_key
record_buf = @formatter_proc.call(tag, time, record)
rescue StandardError => e
log.warn "unexpected error during format record. Skip broken event:", :error => e.to_s, :error_class => e.class.to_s, :time => time, :record => record
next