lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.4.2.rc1 vs lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.4.2
- old
+ new
@@ -1,6 +1,7 @@
require 'fluent/input'
+require 'fluent/time'
require 'fluent/plugin/kafka_plugin_util'
class Fluent::KafkaInput < Fluent::Input
Fluent::Plugin.register_input('kafka', self)
@@ -29,10 +30,14 @@
:desc => "tag suffix"
config_param :add_offset_in_record, :bool, :default => false
config_param :offset_zookeeper, :string, :default => nil
config_param :offset_zk_root_node, :string, :default => '/fluent-plugin-kafka'
+ config_param :use_record_time, :bool, :default => false,
+ :desc => "Replace message timestamp with contents of 'time' field."
+ config_param :time_format, :string, :default => nil,
+ :desc => "Time format to be used to parse 'time' filed."
# Kafka#fetch_messages options
config_param :max_bytes, :integer, :default => nil,
:desc => "Maximum number of bytes to fetch."
config_param :max_wait_time, :integer, :default => nil,
@@ -47,10 +52,12 @@
end
def initialize
super
require 'kafka'
+
+ @time_parser = nil
end
def configure(conf)
super
@@ -94,10 +101,14 @@
@max_wait_time = @interval if @max_wait_time.nil?
require 'zookeeper' if @offset_zookeeper
@parser_proc = setup_parser
+
+ if @use_record_time and @time_format
+ @time_parser = Fluent::TextParser::TimeParser.new(@time_format)
+ end
end
def setup_parser
case @format
when 'json'
@@ -229,10 +240,20 @@
tag = @add_prefix + "." + tag if @add_prefix
tag = tag + "." + @add_suffix if @add_suffix
messages.each { |msg|
begin
- es.add(Fluent::Engine.now, @parser.call(msg, @topic_entry))
+ record = @parser.call(msg, @topic_entry)
+ if @use_record_time
+ if @time_format
+ record_time = @time_parser.parse(record['time'])
+ else
+ record_time = record['time']
+ end
+ else
+ record_time = Fluent::Engine.now
+ end
+ es.add(record_time, record)
rescue => e
$log.warn "parser error in #{@topic_entry.topic}/#{@topic_entry.partition}", :error => e.to_s, :value => msg.value, :offset => msg.offset
$log.debug_backtrace
end
}