lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.4.2.rc1 vs lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.4.2

- old
+ new

@@ -1,6 +1,7 @@ require 'fluent/input' +require 'fluent/time' require 'fluent/plugin/kafka_plugin_util' class Fluent::KafkaInput < Fluent::Input Fluent::Plugin.register_input('kafka', self) @@ -29,10 +30,14 @@ :desc => "tag suffix" config_param :add_offset_in_record, :bool, :default => false config_param :offset_zookeeper, :string, :default => nil config_param :offset_zk_root_node, :string, :default => '/fluent-plugin-kafka' + config_param :use_record_time, :bool, :default => false, + :desc => "Replace message timestamp with contents of 'time' field." + config_param :time_format, :string, :default => nil, + :desc => "Time format to be used to parse 'time' filed." # Kafka#fetch_messages options config_param :max_bytes, :integer, :default => nil, :desc => "Maximum number of bytes to fetch." config_param :max_wait_time, :integer, :default => nil, @@ -47,10 +52,12 @@ end def initialize super require 'kafka' + + @time_parser = nil end def configure(conf) super @@ -94,10 +101,14 @@ @max_wait_time = @interval if @max_wait_time.nil? require 'zookeeper' if @offset_zookeeper @parser_proc = setup_parser + + if @use_record_time and @time_format + @time_parser = Fluent::TextParser::TimeParser.new(@time_format) + end end def setup_parser case @format when 'json' @@ -229,10 +240,20 @@ tag = @add_prefix + "." + tag if @add_prefix tag = tag + "." + @add_suffix if @add_suffix messages.each { |msg| begin - es.add(Fluent::Engine.now, @parser.call(msg, @topic_entry)) + record = @parser.call(msg, @topic_entry) + if @use_record_time + if @time_format + record_time = @time_parser.parse(record['time']) + else + record_time = record['time'] + end + else + record_time = Fluent::Engine.now + end + es.add(record_time, record) rescue => e $log.warn "parser error in #{@topic_entry.topic}/#{@topic_entry.partition}", :error => e.to_s, :value => msg.value, :offset => msg.offset $log.debug_backtrace end }