lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.0.5 vs lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.0.6

- old
+ new

@@ -1,11 +1,11 @@ module Fluent class KafkaInput < Input Plugin.register_input('kafka', self) - config_param :format, :string, :default => 'json' # (json|text) + config_param :format, :string, :default => 'json' # (json|text|ltsv) config_param :host, :string, :default => 'localhost' config_param :port, :integer, :default => 2181 config_param :interval, :integer, :default => 1 # seconds config_param :topics, :string config_param :client_id, :string, :default => 'kafka' @@ -26,11 +26,13 @@ raise ConfigError, "kafka: 'topics' is a require parameter" end case @format when 'json' - require 'json' + require 'yajl' + when 'ltsv' + require 'ltsv' end end def start @loop = Coolio::Loop.new @@ -103,10 +105,12 @@ def parse_line(record) parsed_record = {} case @format when 'json' - parsed_record = JSON.parse(record) + parsed_record = Yajl::Parser.parse(record) + when 'ltsv' + parsed_record = LTSV.parse(record) when 'text' parsed_record = record end parsed_record end