lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.3.0.rc1 vs lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.3.0
- old
+ new
@@ -1,13 +1,11 @@
require 'fluent/input'
require 'fluent/plugin/kafka_plugin_util'
-module Fluent
+class Fluent::KafkaInput < Fluent::Input
+ Fluent::Plugin.register_input('kafka', self)
-class KafkaInput < Input
- Plugin.register_input('kafka', self)
-
config_param :format, :string, :default => 'json',
:desc => "Supported format: (json|text|ltsv|msgpack)"
config_param :message_key, :string, :default => 'message',
:desc => "For 'text' format only."
config_param :host, :string, :default => nil,
@@ -39,14 +37,12 @@
:desc => "Maximum number of bytes to fetch."
config_param :max_wait_time, :integer, :default => nil,
:desc => "How long to block until the server sends us data."
config_param :min_bytes, :integer, :default => nil,
:desc => "Smallest amount of data the server should send us."
- config_param :socket_timeout_ms, :integer, :default => nil,
- :desc => "How long to wait for reply from server. Should be higher than max_wait_ms."
- include KafkaPluginUtil::SSLSettings
+ include Fluent::KafkaPluginUtil::SSLSettings
unless method_defined?(:router)
define_method("router") { Fluent::Engine }
end
@@ -64,20 +60,20 @@
TopicEntry.new(topic.strip, @partition, @offset)
}
else
conf.elements.select { |element| element.name == 'topic' }.each do |element|
unless element.has_key?('topic')
- raise ConfigError, "kafka: 'topic' is a require parameter in 'topic element'."
+ raise Fluent::ConfigError, "kafka: 'topic' is a require parameter in 'topic element'."
end
partition = element.has_key?('partition') ? element['partition'].to_i : 0
offset = element.has_key?('offset') ? element['offset'].to_i : -1
@topic_list.push(TopicEntry.new(element['topic'], partition, offset))
end
end
if @topic_list.empty?
- raise ConfigError, "kafka: 'topics' or 'topic element' is a require parameter"
+ raise Fluent::ConfigError, "kafka: 'topics' or 'topic element' is a require parameter"
end
# For backward compatibility
@brokers = case
when @host && @port
@@ -226,18 +222,18 @@
@fetch_args[:offset] = offset
messages = @kafka.fetch_messages(@fetch_args)
return if messages.size.zero?
- es = MultiEventStream.new
+ es = Fluent::MultiEventStream.new
tag = @topic_entry.topic
tag = @add_prefix + "." + tag if @add_prefix
tag = tag + "." + @add_suffix if @add_suffix
messages.each { |msg|
begin
- es.add(Engine.now, @parser.call(msg, @topic_entry))
+ es.add(Fluent::Engine.now, @parser.call(msg, @topic_entry))
rescue => e
$log.warn "parser error in #{@topic_entry.topic}/#{@topic_entry.partition}", :error => e.to_s, :value => msg.value, :offset => msg.offset
$log.debug_backtrace
end
}
@@ -286,8 +282,6 @@
def save_offset(offset)
@zookeeper.set(:path => @zk_path, :data => offset.to_s)
$log.trace "update zk offset node : #{offset.to_s}"
end
end
-end
-
end