lib/fluent/plugin/out_solr.rb in fluent-plugin-output-solr-1.0.0 vs lib/fluent/plugin/out_solr.rb in fluent-plugin-output-solr-1.0.1

- old
+ new

@@ -10,12 +10,13 @@ helpers :inject, :compat_parameters DEFAULT_COLLECTION = 'collection1' DEFAULT_IGNORE_UNDEFINED_FIELDS = false + DEFAULT_STRING_FIELD_VALUE_MAX_LENGTH = -1 DEFAULT_TAG_FIELD = 'tag' - DEFAULT_TIMESTAMP_FIELD = 'event_timestamp' + DEFAULT_TIMESTAMP_FIELD = 'time' DEFAULT_FLUSH_SIZE = 100 DEFAULT_BUFFER_TYPE = "memory" DEFAULT_COMMIT_WITH_FLUSH = true MODE_STANDALONE = 'Standalone' @@ -34,17 +35,19 @@ config_param :defined_fields, :array, :default => nil, :desc => 'The defined fields in the Solr schema.xml. If omitted, it will get fields via Solr Schema API.' config_param :ignore_undefined_fields, :bool, :default => DEFAULT_IGNORE_UNDEFINED_FIELDS, :desc => 'Ignore undefined fields in the Solr schema.xml.' + config_param :string_field_value_max_length, :integer, :default => DEFAULT_STRING_FIELD_VALUE_MAX_LENGTH, + :desc => 'Field value max length.' config_param :unique_key_field, :string, :default => nil, :desc => 'A field name of unique key in the Solr schema.xml. If omitted, it will get unique key via Solr Schema API.' config_param :tag_field, :string, :default => DEFAULT_TAG_FIELD, - :desc => 'A field name of fluentd tag in the Solr schema.xml (default event_timestamp).' + :desc => 'A field name of fluentd tag in the Solr schema.xml (default time).' config_param :timestamp_field, :string, :default => DEFAULT_TIMESTAMP_FIELD, - :desc => 'A field name of event timestamp in the Solr schema.xml (default event_timestamp).' + :desc => 'A field name of event timestamp in the Solr schema.xml (default time).' config_param :flush_size, :integer, :default => DEFAULT_FLUSH_SIZE, :desc => 'A number of events to queue up before writing to Solr (default 100).' config_param :commit_with_flush, :bool, :default => DEFAULT_COMMIT_WITH_FLUSH, @@ -60,10 +63,11 @@ end def configure(conf) compat_parameters_convert(conf, :inject) super + raise Fluent::ConfigError, "'tag' in chunk_keys is required." if not @chunk_key_tag end def start super @@ -140,10 +144,34 @@ record.delete(key) end end end + if @string_field_value_max_length >= 0 then + record.each_key do |key| + if record[key].instance_of?(Array) then + values = [] + record[key].each do |value| + if value.instance_of?(String) then + if value.length > @string_field_value_max_length then + log.warn "#{key} is too long (#{value.length}, max is #{@string_field_value_max_length})." + values.push(value.slice(0, @string_field_value_max_length)) + else + values.push(value) + end + end + end + record[key] = values + elsif record[key].instance_of?(String) then + if record[key].length > @string_field_value_max_length then + log.warn "#{key} is too long (#{record[key].length}, max is #{@string_field_value_max_length})." + record[key] = record[key].slice(0, @string_field_value_max_length) + end + end + end + end + documents << record if documents.count >= @flush_size update documents documents.clear @@ -157,11 +185,11 @@ if @mode == MODE_STANDALONE then @solr.add documents, :params => {:commit => @commit_with_flush} log.debug "Added %d document(s) to Solr" % documents.count elsif @mode == MODE_SOLRCLOUD then @solr.add documents, collection: @collection, :params => {:commit => @commit_with_flush} - log.debug "Update: Added %d document(s) to Solr" % documents.count + log.debug "Added #{documents.count} document(s) to Solr" end rescue Exception => e log.warn "Update: An error occurred while indexing: #{e.message}" end @@ -178,11 +206,11 @@ log.debug "Unique key: #{unique_key}" return unique_key rescue Exception => e - log.warn "Unique key: #{e.message}" + log.warn "An error occurred: #{e.message}" end def get_fields response = nil @@ -199,9 +227,9 @@ log.debug "Fields: #{fields}" return fields rescue Exception => e - log.warn "Fields: #{e.message}" + log.warn "An error occurred: #{e.message}" end end end