lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.12.0 vs lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.12.1
- old
+ new
@@ -33,14 +33,17 @@
config_param :add_offset_in_record, :bool, :default => false
config_param :offset_zookeeper, :string, :default => nil
config_param :offset_zk_root_node, :string, :default => '/fluent-plugin-kafka'
config_param :use_record_time, :bool, :default => false,
- :desc => "Replace message timestamp with contents of 'time' field."
+ :desc => "Replace message timestamp with contents of 'time' field.",
+ :deprecated => "Use 'time_source record' instead."
+ config_param :time_source, :enum, :list => [:now, :kafka, :record], :default => :now,
+ :desc => "Source for message timestamp."
config_param :get_kafka_client_log, :bool, :default => false
config_param :time_format, :string, :default => nil,
- :desc => "Time format to be used to parse 'time' filed."
+ :desc => "Time format to be used to parse 'time' field."
config_param :kafka_message_key, :string, :default => nil,
:desc => "Set kafka's message key to this field"
# Kafka#fetch_messages options
config_param :max_bytes, :integer, :default => nil,
@@ -108,11 +111,13 @@
require 'zookeeper' if @offset_zookeeper
@parser_proc = setup_parser
- if @use_record_time and @time_format
+ @time_source = :record if @use_record_time
+
+ if @time_source == :record and @time_format
if defined?(Fluent::TimeParser)
@time_parser = Fluent::TimeParser.new(@time_format)
else
@time_parser = Fluent::TextParser::TimeParser.new(@time_format)
end
@@ -204,10 +209,11 @@
@add_prefix,
@add_suffix,
offset_manager,
router,
@kafka_message_key,
+ @time_source,
opt)
}
@topic_watchers.each {|tw|
tw.attach(@loop)
}
@@ -228,21 +234,22 @@
$log.error "unexpected error", :error => e.to_s
$log.error_backtrace
end
class TopicWatcher < Coolio::TimerWatcher
- def initialize(topic_entry, kafka, interval, parser, add_prefix, add_suffix, offset_manager, router, kafka_message_key, options={})
+ def initialize(topic_entry, kafka, interval, parser, add_prefix, add_suffix, offset_manager, router, kafka_message_key, time_source, options={})
@topic_entry = topic_entry
@kafka = kafka
@callback = method(:consume)
@parser = parser
@add_prefix = add_prefix
@add_suffix = add_suffix
@options = options
@offset_manager = offset_manager
@router = router
@kafka_message_key = kafka_message_key
+ @time_source = time_source
@next_offset = @topic_entry.offset
if @topic_entry.offset == -1 && offset_manager
@next_offset = offset_manager.next_offset
end
@@ -275,17 +282,22 @@
tag = tag + "." + @add_suffix if @add_suffix
messages.each { |msg|
begin
record = @parser.call(msg, @topic_entry)
- if @use_record_time
+ case @time_source
+ when :kafka
+ record_time = Fluent::EventTime.from_time(msg.create_time)
+ when :now
+ record_time = Fluent::Engine.now
+ when :record
if @time_format
record_time = @time_parser.parse(record['time'])
else
record_time = record['time']
end
else
- record_time = Fluent::Engine.now
+ $log.fatal "BUG: invalid time_source: #{@time_source}"
end
if @kafka_message_key
record[@kafka_message_key] = msg.key
end
es.add(record_time, record)