lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.3.0.rc1 vs lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.3.0

- old
+ new

@@ -1,13 +1,11 @@ require 'fluent/input' require 'fluent/plugin/kafka_plugin_util' -module Fluent +class Fluent::KafkaInput < Fluent::Input + Fluent::Plugin.register_input('kafka', self) -class KafkaInput < Input - Plugin.register_input('kafka', self) - config_param :format, :string, :default => 'json', :desc => "Supported format: (json|text|ltsv|msgpack)" config_param :message_key, :string, :default => 'message', :desc => "For 'text' format only." config_param :host, :string, :default => nil, @@ -39,14 +37,12 @@ :desc => "Maximum number of bytes to fetch." config_param :max_wait_time, :integer, :default => nil, :desc => "How long to block until the server sends us data." config_param :min_bytes, :integer, :default => nil, :desc => "Smallest amount of data the server should send us." - config_param :socket_timeout_ms, :integer, :default => nil, - :desc => "How long to wait for reply from server. Should be higher than max_wait_ms." - include KafkaPluginUtil::SSLSettings + include Fluent::KafkaPluginUtil::SSLSettings unless method_defined?(:router) define_method("router") { Fluent::Engine } end @@ -64,20 +60,20 @@ TopicEntry.new(topic.strip, @partition, @offset) } else conf.elements.select { |element| element.name == 'topic' }.each do |element| unless element.has_key?('topic') - raise ConfigError, "kafka: 'topic' is a require parameter in 'topic element'." + raise Fluent::ConfigError, "kafka: 'topic' is a require parameter in 'topic element'." end partition = element.has_key?('partition') ? element['partition'].to_i : 0 offset = element.has_key?('offset') ? element['offset'].to_i : -1 @topic_list.push(TopicEntry.new(element['topic'], partition, offset)) end end if @topic_list.empty? - raise ConfigError, "kafka: 'topics' or 'topic element' is a require parameter" + raise Fluent::ConfigError, "kafka: 'topics' or 'topic element' is a require parameter" end # For backward compatibility @brokers = case when @host && @port @@ -226,18 +222,18 @@ @fetch_args[:offset] = offset messages = @kafka.fetch_messages(@fetch_args) return if messages.size.zero? - es = MultiEventStream.new + es = Fluent::MultiEventStream.new tag = @topic_entry.topic tag = @add_prefix + "." + tag if @add_prefix tag = tag + "." + @add_suffix if @add_suffix messages.each { |msg| begin - es.add(Engine.now, @parser.call(msg, @topic_entry)) + es.add(Fluent::Engine.now, @parser.call(msg, @topic_entry)) rescue => e $log.warn "parser error in #{@topic_entry.topic}/#{@topic_entry.partition}", :error => e.to_s, :value => msg.value, :offset => msg.offset $log.debug_backtrace end } @@ -286,8 +282,6 @@ def save_offset(offset) @zookeeper.set(:path => @zk_path, :data => offset.to_s) $log.trace "update zk offset node : #{offset.to_s}" end end -end - end