lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.0.15 vs lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.0.16
- old
+ new
@@ -2,10 +2,11 @@
class KafkaInput < Input
Plugin.register_input('kafka', self)
config_param :format, :string, :default => 'json' # (json|text|ltsv)
+ config_param :message_key, :string, :default => 'message' # for 'text' format only
config_param :host, :string, :default => 'localhost'
config_param :port, :integer, :default => 9092
config_param :interval, :integer, :default => 1 # seconds
config_param :topics, :string
config_param :client_id, :string, :default => 'kafka'
@@ -49,11 +50,11 @@
opt[:max_wait_ms] = @max_wait_ms if @max_wait_ms
opt[:min_bytes] = @min_bytes if @min_bytes
opt[:socket_timeout_ms] = @socket_timeout_ms if @socket_timeout_ms
@topic_watchers = @topic_list.map {|topic|
- TopicWatcher.new(topic, @host, @port, @client_id, @partition, @offset, interval, @format, @add_prefix, @add_suffix, opt)
+ TopicWatcher.new(topic, @host, @port, @client_id, @partition, @offset, interval, @format, @message_key, @add_prefix, @add_suffix, opt)
}
@topic_watchers.each {|tw|
tw.attach(@loop)
}
@thread = Thread.new(&method(:run))
@@ -69,14 +70,15 @@
$log.error "unexpected error", :error=>$!.to_s
$log.error_backtrace
end
class TopicWatcher < Coolio::TimerWatcher
- def initialize(topic, host, port, client_id, partition, offset, interval, format, add_prefix, add_suffix, options={})
+ def initialize(topic, host, port, client_id, partition, offset, interval, format, message_key, add_prefix, add_suffix, options={})
@topic = topic
@callback = method(:consume)
@format = format
+ @message_key = message_key
@add_prefix = add_prefix
@add_suffix = add_suffix
@consumer = Poseidon::PartitionConsumer.new(
client_id, # client_id
host, # host
@@ -126,10 +128,10 @@
when 'ltsv'
parsed_record = LTSV.parse(record)
when 'msgpack'
parsed_record = MessagePack.unpack(record)
when 'text'
- parsed_record = record
+ parsed_record[@message_key] = record
end
parsed_record
end
end
end