lib/fluent/plugin/out_solr.rb in fluent-plugin-output-solr-1.0.0 vs lib/fluent/plugin/out_solr.rb in fluent-plugin-output-solr-1.0.1
- old
+ new
@@ -10,12 +10,13 @@
helpers :inject, :compat_parameters
DEFAULT_COLLECTION = 'collection1'
DEFAULT_IGNORE_UNDEFINED_FIELDS = false
+ DEFAULT_STRING_FIELD_VALUE_MAX_LENGTH = -1
DEFAULT_TAG_FIELD = 'tag'
- DEFAULT_TIMESTAMP_FIELD = 'event_timestamp'
+ DEFAULT_TIMESTAMP_FIELD = 'time'
DEFAULT_FLUSH_SIZE = 100
DEFAULT_BUFFER_TYPE = "memory"
DEFAULT_COMMIT_WITH_FLUSH = true
MODE_STANDALONE = 'Standalone'
@@ -34,17 +35,19 @@
config_param :defined_fields, :array, :default => nil,
:desc => 'The defined fields in the Solr schema.xml. If omitted, it will get fields via Solr Schema API.'
config_param :ignore_undefined_fields, :bool, :default => DEFAULT_IGNORE_UNDEFINED_FIELDS,
:desc => 'Ignore undefined fields in the Solr schema.xml.'
+ config_param :string_field_value_max_length, :integer, :default => DEFAULT_STRING_FIELD_VALUE_MAX_LENGTH,
+ :desc => 'Field value max length.'
config_param :unique_key_field, :string, :default => nil,
:desc => 'A field name of unique key in the Solr schema.xml. If omitted, it will get unique key via Solr Schema API.'
config_param :tag_field, :string, :default => DEFAULT_TAG_FIELD,
- :desc => 'A field name of fluentd tag in the Solr schema.xml (default event_timestamp).'
+ :desc => 'A field name of fluentd tag in the Solr schema.xml (default time).'
config_param :timestamp_field, :string, :default => DEFAULT_TIMESTAMP_FIELD,
- :desc => 'A field name of event timestamp in the Solr schema.xml (default event_timestamp).'
+ :desc => 'A field name of event timestamp in the Solr schema.xml (default time).'
config_param :flush_size, :integer, :default => DEFAULT_FLUSH_SIZE,
:desc => 'A number of events to queue up before writing to Solr (default 100).'
config_param :commit_with_flush, :bool, :default => DEFAULT_COMMIT_WITH_FLUSH,
@@ -60,10 +63,11 @@
end
def configure(conf)
compat_parameters_convert(conf, :inject)
super
+ raise Fluent::ConfigError, "'tag' in chunk_keys is required." if not @chunk_key_tag
end
def start
super
@@ -140,10 +144,34 @@
record.delete(key)
end
end
end
+ if @string_field_value_max_length >= 0 then
+ record.each_key do |key|
+ if record[key].instance_of?(Array) then
+ values = []
+ record[key].each do |value|
+ if value.instance_of?(String) then
+ if value.length > @string_field_value_max_length then
+ log.warn "#{key} is too long (#{value.length}, max is #{@string_field_value_max_length})."
+ values.push(value.slice(0, @string_field_value_max_length))
+ else
+ values.push(value)
+ end
+ end
+ end
+ record[key] = values
+ elsif record[key].instance_of?(String) then
+ if record[key].length > @string_field_value_max_length then
+ log.warn "#{key} is too long (#{record[key].length}, max is #{@string_field_value_max_length})."
+ record[key] = record[key].slice(0, @string_field_value_max_length)
+ end
+ end
+ end
+ end
+
documents << record
if documents.count >= @flush_size
update documents
documents.clear
@@ -157,11 +185,11 @@
if @mode == MODE_STANDALONE then
@solr.add documents, :params => {:commit => @commit_with_flush}
log.debug "Added %d document(s) to Solr" % documents.count
elsif @mode == MODE_SOLRCLOUD then
@solr.add documents, collection: @collection, :params => {:commit => @commit_with_flush}
- log.debug "Update: Added %d document(s) to Solr" % documents.count
+ log.debug "Added #{documents.count} document(s) to Solr"
end
rescue Exception => e
log.warn "Update: An error occurred while indexing: #{e.message}"
end
@@ -178,11 +206,11 @@
log.debug "Unique key: #{unique_key}"
return unique_key
rescue Exception => e
- log.warn "Unique key: #{e.message}"
+ log.warn "An error occurred: #{e.message}"
end
def get_fields
response = nil
@@ -199,9 +227,9 @@
log.debug "Fields: #{fields}"
return fields
rescue Exception => e
- log.warn "Fields: #{e.message}"
+ log.warn "An error occurred: #{e.message}"
end
end
end