lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-1.4.1 vs lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-1.5.0

- old
+ new

@@ -6,11 +6,11 @@ module Fluent::Plugin class MongoOutput < Output Fluent::Plugin.register_output('mongo', self) - helpers :event_emitter, :inject, :compat_parameters + helpers :event_emitter, :inject, :compat_parameters, :record_accessor include Fluent::MongoAuthParams include Fluent::MongoAuth include Fluent::LoggerSupport @@ -39,11 +39,14 @@ config_param :replace_dollar_in_key_with, :string, default: nil # Additional date field to be used to Date object desc "Specify keys to use MongoDB's Date. Supported value types are Integer/Float/EventTime/String" config_param :date_keys, :array, default: nil + desc "Specify if the fields in date_keys are of type Integer or Float" + config_param :parse_string_number_date, :bool, default: false + # tag mapping mode desc "Use tag_mapped mode" config_param :tag_mapped, :bool, default: false, deprecated: "use '${tag}' placeholder in collection parameter." desc "Remove tag prefix" @@ -73,10 +76,11 @@ super @nodes = nil @client_options = {} @collection_options = {capped: false} + @accessors = {} end # Following limits are heuristic. BSON is sometimes bigger than MessagePack and JSON. LIMIT_BEFORE_v1_8 = 2 * 1024 * 1024 # 2MB = 4MB / 2 LIMIT_AFTER_v1_8 = 8 * 1024 * 1024 # 8MB = 16MB / 2 @@ -155,10 +159,17 @@ @nodes = ["#{@host}:#{@port}"] if @nodes.nil? configure_logger(@mongo_log_level) log.debug "Setup mongo configuration: mode = #{@tag_mapped ? 'tag mapped' : 'normal'}" + + if @date_keys + @date_keys.each { |field_name| + @accessors[field_name.to_s] = record_accessor_create(field_name) + } + log.debug "Setup record accessor for every date key" + end end def start @client = client @client = authenticate(@client) @@ -209,31 +220,49 @@ record = inject_values_to_record(tag, time, record) # MongoDB uses BSON's Date for time. record[time_key] = Time.at(time || record[time_key]) if time_key if date_keys - date_keys.each { |date_key| + @accessors.each_pair { |date_key, date_key_accessor| begin - date_value = record[date_key] + date_value = date_key_accessor.call(record) case date_value when Fluent::EventTime - record[date_key] = date_value.to_time + value_to_set = date_value.to_time when Integer - record[date_key] = if date_value > 9999999999 + value_to_set = if date_value > 9999999999 + # epoch with milliseconds: e.g. javascript + Time.at(date_value / 1000.0) + else + # epoch with seconds: e.g. ruby + Time.at(date_value) + end + when Float + value_to_set = Time.at(date_value) + else + if @parse_string_number_date + if date_value.to_i.to_s == date_value + date_value = date_value.to_i + value_to_set = if date_value > 9999999999 # epoch with milliseconds: e.g. javascript - Time.at(date_value / 1000.0) + date_value / 1000.0 else # epoch with seconds: e.g. ruby - Time.at(date_value) + date_value end - when Float - record[date_key] = Time.at(date_value) - else - record[date_key] = Time.parse(date_value) + elsif date_value.to_f.to_s == date_value + date_value = date_value.to_f + end + value_to_set = date_value.is_a?(String) ? Time.parse(date_value) : Time.at(date_value) + else + value_to_set = Time.parse(date_value) + end end + + date_key_accessor.set(record, value_to_set) rescue ArgumentError - log.warn "Failed to parse '#{date_key}' field. Expected date types are Integer/Float/String/EventTime: #{record[date_key]}" - record[date_key] = nil + log.warn "Failed to parse '#{date_key}' field. Expected date types are Integer/Float/String/EventTime: #{date_value}" + date_key_accessor.set(record, nil) end } end records << record }