lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.12.0 vs lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.12.1

- old
+ new

@@ -33,14 +33,17 @@ config_param :add_offset_in_record, :bool, :default => false config_param :offset_zookeeper, :string, :default => nil config_param :offset_zk_root_node, :string, :default => '/fluent-plugin-kafka' config_param :use_record_time, :bool, :default => false, - :desc => "Replace message timestamp with contents of 'time' field." + :desc => "Replace message timestamp with contents of 'time' field.", + :deprecated => "Use 'time_source record' instead." + config_param :time_source, :enum, :list => [:now, :kafka, :record], :default => :now, + :desc => "Source for message timestamp." config_param :get_kafka_client_log, :bool, :default => false config_param :time_format, :string, :default => nil, - :desc => "Time format to be used to parse 'time' filed." + :desc => "Time format to be used to parse 'time' field." config_param :kafka_message_key, :string, :default => nil, :desc => "Set kafka's message key to this field" # Kafka#fetch_messages options config_param :max_bytes, :integer, :default => nil, @@ -108,11 +111,13 @@ require 'zookeeper' if @offset_zookeeper @parser_proc = setup_parser - if @use_record_time and @time_format + @time_source = :record if @use_record_time + + if @time_source == :record and @time_format if defined?(Fluent::TimeParser) @time_parser = Fluent::TimeParser.new(@time_format) else @time_parser = Fluent::TextParser::TimeParser.new(@time_format) end @@ -204,10 +209,11 @@ @add_prefix, @add_suffix, offset_manager, router, @kafka_message_key, + @time_source, opt) } @topic_watchers.each {|tw| tw.attach(@loop) } @@ -228,21 +234,22 @@ $log.error "unexpected error", :error => e.to_s $log.error_backtrace end class TopicWatcher < Coolio::TimerWatcher - def initialize(topic_entry, kafka, interval, parser, add_prefix, add_suffix, offset_manager, router, kafka_message_key, options={}) + def initialize(topic_entry, kafka, interval, parser, add_prefix, add_suffix, offset_manager, router, kafka_message_key, time_source, options={}) @topic_entry = topic_entry @kafka = kafka @callback = method(:consume) @parser = parser @add_prefix = add_prefix @add_suffix = add_suffix @options = options @offset_manager = offset_manager @router = router @kafka_message_key = kafka_message_key + @time_source = time_source @next_offset = @topic_entry.offset if @topic_entry.offset == -1 && offset_manager @next_offset = offset_manager.next_offset end @@ -275,17 +282,22 @@ tag = tag + "." + @add_suffix if @add_suffix messages.each { |msg| begin record = @parser.call(msg, @topic_entry) - if @use_record_time + case @time_source + when :kafka + record_time = Fluent::EventTime.from_time(msg.create_time) + when :now + record_time = Fluent::Engine.now + when :record if @time_format record_time = @time_parser.parse(record['time']) else record_time = record['time'] end else - record_time = Fluent::Engine.now + $log.fatal "BUG: invalid time_source: #{@time_source}" end if @kafka_message_key record[@kafka_message_key] = msg.key end es.add(record_time, record)