lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.0.5 vs lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.0.6
- old
+ new
@@ -1,11 +1,11 @@
module Fluent
class KafkaInput < Input
Plugin.register_input('kafka', self)
- config_param :format, :string, :default => 'json' # (json|text)
+ config_param :format, :string, :default => 'json' # (json|text|ltsv)
config_param :host, :string, :default => 'localhost'
config_param :port, :integer, :default => 2181
config_param :interval, :integer, :default => 1 # seconds
config_param :topics, :string
config_param :client_id, :string, :default => 'kafka'
@@ -26,11 +26,13 @@
raise ConfigError, "kafka: 'topics' is a require parameter"
end
case @format
when 'json'
- require 'json'
+ require 'yajl'
+ when 'ltsv'
+ require 'ltsv'
end
end
def start
@loop = Coolio::Loop.new
@@ -103,10 +105,12 @@
def parse_line(record)
parsed_record = {}
case @format
when 'json'
- parsed_record = JSON.parse(record)
+ parsed_record = Yajl::Parser.parse(record)
+ when 'ltsv'
+ parsed_record = LTSV.parse(record)
when 'text'
parsed_record = record
end
parsed_record
end