lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.0.15 vs lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.0.16

- old
+ new

@@ -2,10 +2,11 @@ class KafkaInput < Input Plugin.register_input('kafka', self) config_param :format, :string, :default => 'json' # (json|text|ltsv) + config_param :message_key, :string, :default => 'message' # for 'text' format only config_param :host, :string, :default => 'localhost' config_param :port, :integer, :default => 9092 config_param :interval, :integer, :default => 1 # seconds config_param :topics, :string config_param :client_id, :string, :default => 'kafka' @@ -49,11 +50,11 @@ opt[:max_wait_ms] = @max_wait_ms if @max_wait_ms opt[:min_bytes] = @min_bytes if @min_bytes opt[:socket_timeout_ms] = @socket_timeout_ms if @socket_timeout_ms @topic_watchers = @topic_list.map {|topic| - TopicWatcher.new(topic, @host, @port, @client_id, @partition, @offset, interval, @format, @add_prefix, @add_suffix, opt) + TopicWatcher.new(topic, @host, @port, @client_id, @partition, @offset, interval, @format, @message_key, @add_prefix, @add_suffix, opt) } @topic_watchers.each {|tw| tw.attach(@loop) } @thread = Thread.new(&method(:run)) @@ -69,14 +70,15 @@ $log.error "unexpected error", :error=>$!.to_s $log.error_backtrace end class TopicWatcher < Coolio::TimerWatcher - def initialize(topic, host, port, client_id, partition, offset, interval, format, add_prefix, add_suffix, options={}) + def initialize(topic, host, port, client_id, partition, offset, interval, format, message_key, add_prefix, add_suffix, options={}) @topic = topic @callback = method(:consume) @format = format + @message_key = message_key @add_prefix = add_prefix @add_suffix = add_suffix @consumer = Poseidon::PartitionConsumer.new( client_id, # client_id host, # host @@ -126,10 +128,10 @@ when 'ltsv' parsed_record = LTSV.parse(record) when 'msgpack' parsed_record = MessagePack.unpack(record) when 'text' - parsed_record = record + parsed_record[@message_key] = record end parsed_record end end end