lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-1.4.1 vs lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-1.5.0
- old
+ new
@@ -6,11 +6,11 @@
module Fluent::Plugin
class MongoOutput < Output
Fluent::Plugin.register_output('mongo', self)
- helpers :event_emitter, :inject, :compat_parameters
+ helpers :event_emitter, :inject, :compat_parameters, :record_accessor
include Fluent::MongoAuthParams
include Fluent::MongoAuth
include Fluent::LoggerSupport
@@ -39,11 +39,14 @@
config_param :replace_dollar_in_key_with, :string, default: nil
# Additional date field to be used to Date object
desc "Specify keys to use MongoDB's Date. Supported value types are Integer/Float/EventTime/String"
config_param :date_keys, :array, default: nil
+ desc "Specify if the fields in date_keys are of type Integer or Float"
+ config_param :parse_string_number_date, :bool, default: false
+
# tag mapping mode
desc "Use tag_mapped mode"
config_param :tag_mapped, :bool, default: false,
deprecated: "use '${tag}' placeholder in collection parameter."
desc "Remove tag prefix"
@@ -73,10 +76,11 @@
super
@nodes = nil
@client_options = {}
@collection_options = {capped: false}
+ @accessors = {}
end
# Following limits are heuristic. BSON is sometimes bigger than MessagePack and JSON.
LIMIT_BEFORE_v1_8 = 2 * 1024 * 1024 # 2MB = 4MB / 2
LIMIT_AFTER_v1_8 = 8 * 1024 * 1024 # 8MB = 16MB / 2
@@ -155,10 +159,17 @@
@nodes = ["#{@host}:#{@port}"] if @nodes.nil?
configure_logger(@mongo_log_level)
log.debug "Setup mongo configuration: mode = #{@tag_mapped ? 'tag mapped' : 'normal'}"
+
+ if @date_keys
+ @date_keys.each { |field_name|
+ @accessors[field_name.to_s] = record_accessor_create(field_name)
+ }
+ log.debug "Setup record accessor for every date key"
+ end
end
def start
@client = client
@client = authenticate(@client)
@@ -209,31 +220,49 @@
record = inject_values_to_record(tag, time, record)
# MongoDB uses BSON's Date for time.
record[time_key] = Time.at(time || record[time_key]) if time_key
if date_keys
- date_keys.each { |date_key|
+ @accessors.each_pair { |date_key, date_key_accessor|
begin
- date_value = record[date_key]
+ date_value = date_key_accessor.call(record)
case date_value
when Fluent::EventTime
- record[date_key] = date_value.to_time
+ value_to_set = date_value.to_time
when Integer
- record[date_key] = if date_value > 9999999999
+ value_to_set = if date_value > 9999999999
+ # epoch with milliseconds: e.g. javascript
+ Time.at(date_value / 1000.0)
+ else
+ # epoch with seconds: e.g. ruby
+ Time.at(date_value)
+ end
+ when Float
+ value_to_set = Time.at(date_value)
+ else
+ if @parse_string_number_date
+ if date_value.to_i.to_s == date_value
+ date_value = date_value.to_i
+ value_to_set = if date_value > 9999999999
# epoch with milliseconds: e.g. javascript
- Time.at(date_value / 1000.0)
+ date_value / 1000.0
else
# epoch with seconds: e.g. ruby
- Time.at(date_value)
+ date_value
end
- when Float
- record[date_key] = Time.at(date_value)
- else
- record[date_key] = Time.parse(date_value)
+ elsif date_value.to_f.to_s == date_value
+ date_value = date_value.to_f
+ end
+ value_to_set = date_value.is_a?(String) ? Time.parse(date_value) : Time.at(date_value)
+ else
+ value_to_set = Time.parse(date_value)
+ end
end
+
+ date_key_accessor.set(record, value_to_set)
rescue ArgumentError
- log.warn "Failed to parse '#{date_key}' field. Expected date types are Integer/Float/String/EventTime: #{record[date_key]}"
- record[date_key] = nil
+ log.warn "Failed to parse '#{date_key}' field. Expected date types are Integer/Float/String/EventTime: #{date_value}"
+ date_key_accessor.set(record, nil)
end
}
end
records << record
}