lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-1.5.0 vs lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-1.6.0

- old
+ new

@@ -41,12 +41,13 @@ # Additional date field to be used to Date object desc "Specify keys to use MongoDB's Date. Supported value types are Integer/Float/EventTime/String" config_param :date_keys, :array, default: nil desc "Specify if the fields in date_keys are of type Integer or Float" config_param :parse_string_number_date, :bool, default: false + desc "Specify keys to use MongoDB's ObjectId" + config_param :object_id_keys, :array, default: nil - # tag mapping mode desc "Use tag_mapped mode" config_param :tag_mapped, :bool, default: false, deprecated: "use '${tag}' placeholder in collection parameter." desc "Remove tag prefix" @@ -76,11 +77,12 @@ super @nodes = nil @client_options = {} @collection_options = {capped: false} - @accessors = {} + @date_accessors = {} + @object_id_accessors = {} end # Following limits are heuristic. BSON is sometimes bigger than MessagePack and JSON. LIMIT_BEFORE_v1_8 = 2 * 1024 * 1024 # 2MB = 4MB / 2 LIMIT_AFTER_v1_8 = 8 * 1024 * 1024 # 8MB = 16MB / 2 @@ -162,14 +164,20 @@ log.debug "Setup mongo configuration: mode = #{@tag_mapped ? 'tag mapped' : 'normal'}" if @date_keys @date_keys.each { |field_name| - @accessors[field_name.to_s] = record_accessor_create(field_name) + @date_accessors[field_name.to_s] = record_accessor_create(field_name) } log.debug "Setup record accessor for every date key" end + if @object_id_keys + @object_id_keys.each { |field_name| + @object_id_accessors[field_name.to_s] = record_accessor_create(field_name) + } + log.debug "Setup record accessor for every object_id key" + end end def start @client = client @client = authenticate(@client) @@ -212,19 +220,20 @@ def collect_records(chunk) records = [] time_key = @inject_config.time_key if @inject_config date_keys = @date_keys + object_id_keys = @object_id_keys tag = chunk.metadata.tag chunk.msgpack_each {|time, record| record = inject_values_to_record(tag, time, record) # MongoDB uses BSON's Date for time. record[time_key] = Time.at(time || record[time_key]) if time_key if date_keys - @accessors.each_pair { |date_key, date_key_accessor| + @date_accessors.each_pair { |date_key, date_key_accessor| begin date_value = date_key_accessor.call(record) case date_value when Fluent::EventTime value_to_set = date_value.to_time @@ -260,9 +269,21 @@ date_key_accessor.set(record, value_to_set) rescue ArgumentError log.warn "Failed to parse '#{date_key}' field. Expected date types are Integer/Float/String/EventTime: #{date_value}" date_key_accessor.set(record, nil) + end + } + end + if object_id_keys + @object_id_accessors.each_pair { |object_id_key, object_id_key_accessor| + begin + object_id_value = object_id_key_accessor.call(record) + value_to_set = BSON::ObjectId(object_id_value) + object_id_key_accessor.set(record, value_to_set) + rescue BSON::ObjectId::Invalid + log.warn "Failed to parse '#{object_id_key}' field. Expected object_id types are String: #{object_id_value}" + object_id_key_accessor.set(record, nil) end } end records << record }