lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.2.2 vs lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.3.0.rc1

- old
+ new

@@ -1,19 +1,23 @@ require 'fluent/input' +require 'fluent/plugin/kafka_plugin_util' + module Fluent class KafkaInput < Input Plugin.register_input('kafka', self) config_param :format, :string, :default => 'json', :desc => "Supported format: (json|text|ltsv|msgpack)" config_param :message_key, :string, :default => 'message', :desc => "For 'text' format only." - config_param :host, :string, :default => 'localhost', + config_param :host, :string, :default => nil, :desc => "Broker host" - config_param :port, :integer, :default => 9092, + config_param :port, :integer, :default => nil, :desc => "Broker port" + config_param :brokers, :string, :default => 'localhost:9092', + :desc => "List of broker-host:port, separate with comma, must set." config_param :interval, :integer, :default => 1, # seconds :desc => "Interval (Unit: seconds)" config_param :topics, :string, :default => nil, :desc => "Listening topics(separate with comma',')" config_param :client_id, :string, :default => 'kafka' @@ -28,28 +32,29 @@ config_param :add_offset_in_record, :bool, :default => false config_param :offset_zookeeper, :string, :default => nil config_param :offset_zk_root_node, :string, :default => '/fluent-plugin-kafka' - # poseidon PartitionConsumer options + # Kafka#fetch_messages options config_param :max_bytes, :integer, :default => nil, :desc => "Maximum number of bytes to fetch." - config_param :max_wait_ms, :integer, :default => nil, + config_param :max_wait_time, :integer, :default => nil, :desc => "How long to block until the server sends us data." config_param :min_bytes, :integer, :default => nil, :desc => "Smallest amount of data the server should send us." config_param :socket_timeout_ms, :integer, :default => nil, :desc => "How long to wait for reply from server. Should be higher than max_wait_ms." + include KafkaPluginUtil::SSLSettings + unless method_defined?(:router) define_method("router") { Fluent::Engine } end def initialize super - require 'poseidon' - require 'zookeeper' + require 'kafka' end def configure(conf) super @@ -71,42 +76,94 @@ if @topic_list.empty? raise ConfigError, "kafka: 'topics' or 'topic element' is a require parameter" end + # For backward compatibility + @brokers = case + when @host && @port + ["#{@host}:#{@port}"] + when @host + ["#{@host}:9092"] + when @port + ["localhost:#{@port}"] + else + @brokers + end + + if conf['max_wait_ms'] + log.warn "'max_wait_ms' parameter is deprecated. Use second unit 'max_wait_time' instead" + @max_wait_time = conf['max_wait_ms'].to_i / 1000 + end + + @max_wait_time = @interval if @max_wait_time.nil? + + require 'zookeeper' if @offset_zookeeper + + @parser_proc = setup_parser + end + + def setup_parser case @format when 'json' require 'yajl' + Proc.new { |msg, te| + r = Yajl::Parser.parse(msg.value) + add_offset_in_hash(r, te, msg.offset) if @add_offset_in_record + r + } when 'ltsv' require 'ltsv' + Proc.new { |msg, te| + r = LTSV.parse(msg.value).first + add_offset_in_hash(r, te, msg.offset) if @add_offset_in_record + r + } when 'msgpack' require 'msgpack' + Proc.new { |msg, te| + r = MessagePack.unpack(msg.value) + add_offset_in_hash(r, te, msg.offset) if @add_offset_in_record + r + } + when 'text' + Proc.new { |msg, te| + r = {@message_key => msg.value} + add_offset_in_hash(r, te, msg.offset) if @add_offset_in_record + r + } end end + def add_offset_in_hash(hash, te, offset) + hash['kafka_topic'.freeze] = te.topic + hash['kafka_partition'.freeze] = te.partition + hash['kafka_offset'.freeze] = offset + end + def start super + @loop = Coolio::Loop.new opt = {} opt[:max_bytes] = @max_bytes if @max_bytes - opt[:max_wait_ms] = @max_wait_ms if @max_wait_ms + opt[:max_wait_time] = @max_wait_time if @max_wait_time opt[:min_bytes] = @min_bytes if @min_bytes - opt[:socket_timeout_ms] = @socket_timeout_ms if @socket_timeout_ms + @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, + ssl_ca_cert: read_ssl_file(@ssl_ca_cert), + ssl_client_cert: read_ssl_file(@ssl_client_cert), + ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key)) @zookeeper = Zookeeper.new(@offset_zookeeper) if @offset_zookeeper @topic_watchers = @topic_list.map {|topic_entry| offset_manager = OffsetManager.new(topic_entry, @zookeeper, @offset_zk_root_node) if @offset_zookeeper TopicWatcher.new( topic_entry, - @host, - @port, - @client_id, + @kafka, interval, - @format, - @message_key, - @add_offset_in_record, + @parser_proc, @add_prefix, @add_suffix, offset_manager, router, opt) @@ -118,130 +175,83 @@ end def shutdown @loop.stop @zookeeper.close! if @zookeeper + @thread.join + @kafka.close super end def run @loop.run - rescue - $log.error "unexpected error", :error=>$!.to_s + rescue => e + $log.error "unexpected error", :error => e.to_s $log.error_backtrace end class TopicWatcher < Coolio::TimerWatcher - def initialize(topic_entry, host, port, client_id, interval, format, message_key, add_offset_in_record, add_prefix, add_suffix, offset_manager, router, options={}) + def initialize(topic_entry, kafka, interval, parser, add_prefix, add_suffix, offset_manager, router, options={}) @topic_entry = topic_entry - @host = host - @port = port - @client_id = client_id + @kafka = kafka @callback = method(:consume) - @format = format - @message_key = message_key - @add_offset_in_record = add_offset_in_record + @parser = parser @add_prefix = add_prefix @add_suffix = add_suffix @options = options @offset_manager = offset_manager @router = router @next_offset = @topic_entry.offset if @topic_entry.offset == -1 && offset_manager @next_offset = offset_manager.next_offset end - @consumer = create_consumer(@next_offset) + @fetch_args = { + topic: @topic_entry.topic, + partition: @topic_entry.partition, + }.merge(@options) super(interval, true) end def on_timer @callback.call - rescue - # TODO log? - $log.error $!.to_s - $log.error_backtrace + rescue => e + # TODO log? + $log.error e.to_s + $log.error_backtrace end def consume + offset = @next_offset + @fetch_args[:offset] = offset + messages = @kafka.fetch_messages(@fetch_args) + + return if messages.size.zero? + es = MultiEventStream.new tag = @topic_entry.topic tag = @add_prefix + "." + tag if @add_prefix tag = tag + "." + @add_suffix if @add_suffix - if @offset_manager && @consumer.next_offset != @next_offset - @consumer = create_consumer(@next_offset) - end - - @consumer.fetch.each { |msg| + messages.each { |msg| begin - msg_record = parse_line(msg.value) - msg_record = decorate_offset(msg_record, msg.offset) if @add_offset_in_record - es.add(Engine.now, msg_record) - rescue - $log.warn msg_record.to_s, :error=>$!.to_s + es.add(Engine.now, @parser.call(msg, @topic_entry)) + rescue => e + $log.warn "parser error in #{@topic_entry.topic}/#{@topic_entry.partition}", :error => e.to_s, :value => msg.value, :offset => msg.offset $log.debug_backtrace end } + offset = messages.last.offset + 1 unless es.empty? @router.emit_stream(tag, es) if @offset_manager - next_offset = @consumer.next_offset - @offset_manager.save_offset(next_offset) - @next_offset = next_offset + @offset_manager.save_offset(offset) end + @next_offset = offset end - end - - def create_consumer(offset) - @consumer.close if @consumer - Poseidon::PartitionConsumer.new( - @client_id, # client_id - @host, # host - @port, # port - @topic_entry.topic, # topic - @topic_entry.partition, # partition - offset, # offset - @options # options - ) - end - - def parse_line(record) - case @format - when 'json' - Yajl::Parser.parse(record) - when 'ltsv' - LTSV.parse(record) - when 'msgpack' - MessagePack.unpack(record) - when 'text' - {@message_key => record} - end - end - - def decorate_offset(record, offset) - case @format - when 'json' - add_offset_in_hash(record, @topic_entry.topic, @topic_entry.partition, offset) - when 'ltsv' - record.each { |line| - add_offset_in_hash(line, @topic_entry.topic, @topic_entry.partition, offset) - } - when 'msgpack' - add_offset_in_hash(record, @topic_entry.topic, @topic_entry.partition, offset) - when 'text' - add_offset_in_hash(record, @topic_entry.topic, @topic_entry.partition, offset) - end - record - end - - def add_offset_in_hash(hash, topic, partition, offset) - hash['kafka_topic'] = topic - hash['kafka_partition'] = partition - hash['kafka_offset'] = offset end end class TopicEntry def initialize(topic, partition, offset)