lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-1.5.0 vs lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-1.6.0
- old
+ new
@@ -41,12 +41,13 @@
# Additional date field to be used to Date object
desc "Specify keys to use MongoDB's Date. Supported value types are Integer/Float/EventTime/String"
config_param :date_keys, :array, default: nil
desc "Specify if the fields in date_keys are of type Integer or Float"
config_param :parse_string_number_date, :bool, default: false
+ desc "Specify keys to use MongoDB's ObjectId"
+ config_param :object_id_keys, :array, default: nil
-
# tag mapping mode
desc "Use tag_mapped mode"
config_param :tag_mapped, :bool, default: false,
deprecated: "use '${tag}' placeholder in collection parameter."
desc "Remove tag prefix"
@@ -76,11 +77,12 @@
super
@nodes = nil
@client_options = {}
@collection_options = {capped: false}
- @accessors = {}
+ @date_accessors = {}
+ @object_id_accessors = {}
end
# Following limits are heuristic. BSON is sometimes bigger than MessagePack and JSON.
LIMIT_BEFORE_v1_8 = 2 * 1024 * 1024 # 2MB = 4MB / 2
LIMIT_AFTER_v1_8 = 8 * 1024 * 1024 # 8MB = 16MB / 2
@@ -162,14 +164,20 @@
log.debug "Setup mongo configuration: mode = #{@tag_mapped ? 'tag mapped' : 'normal'}"
if @date_keys
@date_keys.each { |field_name|
- @accessors[field_name.to_s] = record_accessor_create(field_name)
+ @date_accessors[field_name.to_s] = record_accessor_create(field_name)
}
log.debug "Setup record accessor for every date key"
end
+ if @object_id_keys
+ @object_id_keys.each { |field_name|
+ @object_id_accessors[field_name.to_s] = record_accessor_create(field_name)
+ }
+ log.debug "Setup record accessor for every object_id key"
+ end
end
def start
@client = client
@client = authenticate(@client)
@@ -212,19 +220,20 @@
def collect_records(chunk)
records = []
time_key = @inject_config.time_key if @inject_config
date_keys = @date_keys
+ object_id_keys = @object_id_keys
tag = chunk.metadata.tag
chunk.msgpack_each {|time, record|
record = inject_values_to_record(tag, time, record)
# MongoDB uses BSON's Date for time.
record[time_key] = Time.at(time || record[time_key]) if time_key
if date_keys
- @accessors.each_pair { |date_key, date_key_accessor|
+ @date_accessors.each_pair { |date_key, date_key_accessor|
begin
date_value = date_key_accessor.call(record)
case date_value
when Fluent::EventTime
value_to_set = date_value.to_time
@@ -260,9 +269,21 @@
date_key_accessor.set(record, value_to_set)
rescue ArgumentError
log.warn "Failed to parse '#{date_key}' field. Expected date types are Integer/Float/String/EventTime: #{date_value}"
date_key_accessor.set(record, nil)
+ end
+ }
+ end
+ if object_id_keys
+ @object_id_accessors.each_pair { |object_id_key, object_id_key_accessor|
+ begin
+ object_id_value = object_id_key_accessor.call(record)
+ value_to_set = BSON::ObjectId(object_id_value)
+ object_id_key_accessor.set(record, value_to_set)
+ rescue BSON::ObjectId::Invalid
+ log.warn "Failed to parse '#{object_id_key}' field. Expected object_id types are String: #{object_id_value}"
+ object_id_key_accessor.set(record, nil)
end
}
end
records << record
}