lib/fluent/plugin/out_solr.rb in fluent-plugin-output-solr-0.4.12 vs lib/fluent/plugin/out_solr.rb in fluent-plugin-output-solr-0.4.13

- old
+ new

@@ -6,16 +6,19 @@ module Fluent class SolrOutput < BufferedOutput Fluent::Plugin.register_output('solr', self) DEFAULT_COLLECTION = 'collection1' - DEFAULT_IGNORE_UNDEFINED_FIELDS = false - DEFAULT_STRING_FIELD_VALUE_MAX_LENGTH = -1 + DEFAULT_TAG_FIELD = 'tag' + DEFAULT_TIME_FIELD = 'time' DEFAULT_TIME_FORMAT = '%FT%TZ' DEFAULT_MILLISECOND = false + + DEFAULT_IGNORE_UNDEFINED_FIELDS = false + DEFAULT_FLUSH_SIZE = 100 DEFAULT_COMMIT_WITH_FLUSH = true MODE_STANDALONE = 'Standalone' MODE_SOLRCLOUD = 'SolrCloud' @@ -24,29 +27,25 @@ config_set_default :include_tag_key, false include Fluent::SetTimeKeyMixin config_set_default :include_time_key, false - config_param :url, :string, :default => nil, - :desc => 'The Solr server url (for example http://localhost:8983/solr/collection1).' + config_param :base_url, :string, :default => nil, + :desc => 'The Solr base url (for example http://localhost:8983/solr).' config_param :zk_host, :string, :default => nil, :desc => 'The ZooKeeper connection string that SolrCloud refers to (for example localhost:2181/solr).' + config_param :collection, :string, :default => DEFAULT_COLLECTION, - :desc => 'The SolrCloud collection name (default collection1).' + :desc => 'The Solr collection/core name (default collection1).' - config_param :defined_fields, :array, :default => nil, - :desc => 'The defined fields in the Solr schema.xml. If omitted, it will get fields via Solr Schema API.' config_param :ignore_undefined_fields, :bool, :default => DEFAULT_IGNORE_UNDEFINED_FIELDS, :desc => 'Ignore undefined fields in the Solr schema.xml.' - config_param :string_field_value_max_length, :integer, :default => DEFAULT_STRING_FIELD_VALUE_MAX_LENGTH, - :desc => 'Field value max length.' - config_param :unique_key_field, :string, :default => nil, - :desc => 'A field name of unique key in the Solr schema.xml. If omitted, it will get unique key via Solr Schema API.' config_param :tag_field, :string, :default => DEFAULT_TAG_FIELD, :desc => 'A field name of fluentd tag in the Solr schema.xml (default tag).' + config_param :time_field, :string, :default => DEFAULT_TIME_FIELD, :desc => 'A field name of event timestamp in the Solr schema.xml (default time).' config_param :time_format, :string, :default => DEFAULT_TIME_FORMAT, :desc => 'The format of the time field (default %d/%b/%Y:%H:%M:%S %z).' config_param :millisecond, :bool, :default => DEFAULT_MILLISECOND, @@ -68,21 +67,21 @@ def start super @mode = nil - if ! @url.nil? then + if ! @base_url.nil? then @mode = MODE_STANDALONE elsif ! @zk_host.nil? @mode = MODE_SOLRCLOUD end @solr = nil @zk = nil if @mode == MODE_STANDALONE then - @solr = RSolr.connect :url => @url + @solr = RSolr.connect :url => @base_url.end_with?('/') ? @base_url + @collection : @base_url + '/' + @collection elsif @mode == MODE_SOLRCLOUD then @zk = ZK.new(@zk_host) cloud_connection = RSolr::Cloud::Connection.new(@zk) @solr = RSolr::Client.new(cloud_connection, read_timeout: 60, open_timeout: 60) end @@ -101,147 +100,116 @@ end def write(chunk) documents = [] - @fields = @defined_fields.nil? ? get_fields : @defined_fields - @unique_key = @unique_key_field.nil? ? get_unique_key : @unique_key_field + # Get fields from Solr + fields = get_fields + # Get unique key field from Solr + unique_key = get_unique_key + chunk.msgpack_each do |tag, time, record| - unless record.has_key?(@unique_key) then - record.merge!({@unique_key => SecureRandom.uuid}) + # Set unique key and value + unless record.has_key?(unique_key) then + record.merge!({unique_key => SecureRandom.uuid}) end + # Set Fluentd tag to Solr tag field unless record.has_key?(@tag_field) then record.merge!({@tag_field => tag}) end + # Set time + tmp_time = Time.at(time).utc if record.has_key?(@time_field) then + # Parsing the time field in the record by the specified format. begin tmp_time = Time.strptime(record[@time_field], @time_format).utc - if @millisecond then - record.merge!({@time_field => '%s.%03dZ' % [tmp_time.strftime('%FT%T'), tmp_time.usec / 1000.0]}) - else - record.merge!({@time_field => tmp_time.strftime('%FT%TZ')}) - end - rescue - tmp_time = Time.at(time).utc - if @millisecond then - record.merge!({@time_field => '%s.%03dZ' % [tmp_time.strftime('%FT%T'), tmp_time.usec / 1000.0]}) - else - record.merge!({@time_field => tmp_time.strftime('%FT%TZ')}) - end + rescue Exception => e + log.warn "An error occurred in parsing the time field: #{e.message}" end + end + if @millisecond then + record.merge!({@time_field => '%s.%03dZ' % [tmp_time.strftime('%FT%T'), tmp_time.usec / 1000.0]}) else - tmp_time = Time.at(time).utc - if @millisecond then - record.merge!({@time_field => '%s.%03dZ' % [tmp_time.strftime('%FT%T'), tmp_time.usec / 1000.0]}) - else - record.merge!({@time_field => tmp_time.strftime('%FT%TZ')}) - end + record.merge!({@time_field => tmp_time.strftime('%FT%TZ')}) end + # Ignore undefined fields if @ignore_undefined_fields then record.each_key do |key| - unless @fields.include?(key) then + unless fields.include?(key) then record.delete(key) end end end - if @string_field_value_max_length >= 0 then - record.each_key do |key| - if record[key].instance_of?(Array) then - values = [] - record[key].each do |value| - if value.instance_of?(String) then - if value.length > @string_field_value_max_length then - log.warn "#{key} is too long (#{value.length}, max is #{@string_field_value_max_length})." - values.push(value.slice(0, @string_field_value_max_length)) - else - values.push(value) - end - end - end - record[key] = values - elsif record[key].instance_of?(String) then - if record[key].length > @string_field_value_max_length then - log.warn "#{key} is too long (#{record[key].length}, max is #{@string_field_value_max_length})." - record[key] = record[key].slice(0, @string_field_value_max_length) - end - end - end - end - - # - # delete reserved fields - # https://cwiki.apache.org/confluence/display/solr/Defining+Fields - # - record.each_key do |key| - if key[0] == '_' and key[-1] == '_' then - record.delete(key) - end - end - + # Add record to documents documents << record + # Update when flash size is reached if documents.count >= @flush_size update documents documents.clear end end + # Update remaining documents update documents unless documents.empty? end def update(documents) - if @mode == MODE_STANDALONE then - @solr.add documents, :params => {:commit => @commit_with_flush} - log.debug "Added #{documents.count} document(s) to Solr" - elsif @mode == MODE_SOLRCLOUD then - @solr.add documents, collection: @collection, :params => {:commit => @commit_with_flush} - log.debug "Added #{documents.count} document(s) to Solr" + begin + if @mode == MODE_STANDALONE then + @solr.add documents, :params => {:commit => @commit_with_flush} + elsif @mode == MODE_SOLRCLOUD then + @solr.add documents, collection: @collection, :params => {:commit => @commit_with_flush} + end + log.debug "Sent #{documents.count} document(s) to Solr" + rescue Exception + log.warn "An error occurred while sending #{documents.count} document(s) to Solr" end - rescue Exception => e - log.warn "An error occurred while indexing" end def get_unique_key - response = nil + unique_key = 'id' - if @mode == MODE_STANDALONE then - response = @solr.get 'schema/uniquekey' - elsif @mode == MODE_SOLRCLOUD then - response = @solr.get 'schema/uniquekey', collection: @collection + begin + response = nil + if @mode == MODE_STANDALONE then + response = @solr.get 'schema/uniquekey' + elsif @mode == MODE_SOLRCLOUD then + response = @solr.get 'schema/uniquekey', collection: @collection + end + unique_key = response['uniqueKey'] + log.debug "Unique key: #{unique_key}" + rescue Exception + log.warn 'An error occurred while getting unique key' end - unique_key = response['uniqueKey'] - log.debug "Unique key: #{unique_key}" - return unique_key - - rescue Exception => e - log.warn "An error occurred while getting unique key" end def get_fields - response = nil + fields = [] - if @mode == MODE_STANDALONE then - response = @solr.get 'schema/fields' - elsif @mode == MODE_SOLRCLOUD then - response = @solr.get 'schema/fields', collection: @collection - end + begin + response = nil - fields = [] - response['fields'].each do |field| - fields.push(field['name']) + if @mode == MODE_STANDALONE then + response = @solr.get 'schema/fields' + elsif @mode == MODE_SOLRCLOUD then + response = @solr.get 'schema/fields', collection: @collection + end + response['fields'].each do |field| + fields.push(field['name']) + end + log.debug "Fields: #{fields}" + rescue Exception + log.warn 'An error occurred while getting fields' end - log.debug "Fields: #{fields}" return fields - - rescue Exception => e - log.warn "An error occurred while getting fields" end end end