lib/fluent/plugin/out_solr.rb in fluent-plugin-output-solr-0.4.12 vs lib/fluent/plugin/out_solr.rb in fluent-plugin-output-solr-0.4.13
- old
+ new
@@ -6,16 +6,19 @@
module Fluent
class SolrOutput < BufferedOutput
Fluent::Plugin.register_output('solr', self)
DEFAULT_COLLECTION = 'collection1'
- DEFAULT_IGNORE_UNDEFINED_FIELDS = false
- DEFAULT_STRING_FIELD_VALUE_MAX_LENGTH = -1
+
DEFAULT_TAG_FIELD = 'tag'
+
DEFAULT_TIME_FIELD = 'time'
DEFAULT_TIME_FORMAT = '%FT%TZ'
DEFAULT_MILLISECOND = false
+
+ DEFAULT_IGNORE_UNDEFINED_FIELDS = false
+
DEFAULT_FLUSH_SIZE = 100
DEFAULT_COMMIT_WITH_FLUSH = true
MODE_STANDALONE = 'Standalone'
MODE_SOLRCLOUD = 'SolrCloud'
@@ -24,29 +27,25 @@
config_set_default :include_tag_key, false
include Fluent::SetTimeKeyMixin
config_set_default :include_time_key, false
- config_param :url, :string, :default => nil,
- :desc => 'The Solr server url (for example http://localhost:8983/solr/collection1).'
+ config_param :base_url, :string, :default => nil,
+ :desc => 'The Solr base url (for example http://localhost:8983/solr).'
config_param :zk_host, :string, :default => nil,
:desc => 'The ZooKeeper connection string that SolrCloud refers to (for example localhost:2181/solr).'
+
config_param :collection, :string, :default => DEFAULT_COLLECTION,
- :desc => 'The SolrCloud collection name (default collection1).'
+ :desc => 'The Solr collection/core name (default collection1).'
- config_param :defined_fields, :array, :default => nil,
- :desc => 'The defined fields in the Solr schema.xml. If omitted, it will get fields via Solr Schema API.'
config_param :ignore_undefined_fields, :bool, :default => DEFAULT_IGNORE_UNDEFINED_FIELDS,
:desc => 'Ignore undefined fields in the Solr schema.xml.'
- config_param :string_field_value_max_length, :integer, :default => DEFAULT_STRING_FIELD_VALUE_MAX_LENGTH,
- :desc => 'Field value max length.'
- config_param :unique_key_field, :string, :default => nil,
- :desc => 'A field name of unique key in the Solr schema.xml. If omitted, it will get unique key via Solr Schema API.'
config_param :tag_field, :string, :default => DEFAULT_TAG_FIELD,
:desc => 'A field name of fluentd tag in the Solr schema.xml (default tag).'
+
config_param :time_field, :string, :default => DEFAULT_TIME_FIELD,
:desc => 'A field name of event timestamp in the Solr schema.xml (default time).'
config_param :time_format, :string, :default => DEFAULT_TIME_FORMAT,
:desc => 'The format of the time field (default %d/%b/%Y:%H:%M:%S %z).'
config_param :millisecond, :bool, :default => DEFAULT_MILLISECOND,
@@ -68,21 +67,21 @@
def start
super
@mode = nil
- if ! @url.nil? then
+ if ! @base_url.nil? then
@mode = MODE_STANDALONE
elsif ! @zk_host.nil?
@mode = MODE_SOLRCLOUD
end
@solr = nil
@zk = nil
if @mode == MODE_STANDALONE then
- @solr = RSolr.connect :url => @url
+ @solr = RSolr.connect :url => @base_url.end_with?('/') ? @base_url + @collection : @base_url + '/' + @collection
elsif @mode == MODE_SOLRCLOUD then
@zk = ZK.new(@zk_host)
cloud_connection = RSolr::Cloud::Connection.new(@zk)
@solr = RSolr::Client.new(cloud_connection, read_timeout: 60, open_timeout: 60)
end
@@ -101,147 +100,116 @@
end
def write(chunk)
documents = []
- @fields = @defined_fields.nil? ? get_fields : @defined_fields
- @unique_key = @unique_key_field.nil? ? get_unique_key : @unique_key_field
+ # Get fields from Solr
+ fields = get_fields
+ # Get unique key field from Solr
+ unique_key = get_unique_key
+
chunk.msgpack_each do |tag, time, record|
- unless record.has_key?(@unique_key) then
- record.merge!({@unique_key => SecureRandom.uuid})
+ # Set unique key and value
+ unless record.has_key?(unique_key) then
+ record.merge!({unique_key => SecureRandom.uuid})
end
+ # Set Fluentd tag to Solr tag field
unless record.has_key?(@tag_field) then
record.merge!({@tag_field => tag})
end
+ # Set time
+ tmp_time = Time.at(time).utc
if record.has_key?(@time_field) then
+ # Parsing the time field in the record by the specified format.
begin
tmp_time = Time.strptime(record[@time_field], @time_format).utc
- if @millisecond then
- record.merge!({@time_field => '%s.%03dZ' % [tmp_time.strftime('%FT%T'), tmp_time.usec / 1000.0]})
- else
- record.merge!({@time_field => tmp_time.strftime('%FT%TZ')})
- end
- rescue
- tmp_time = Time.at(time).utc
- if @millisecond then
- record.merge!({@time_field => '%s.%03dZ' % [tmp_time.strftime('%FT%T'), tmp_time.usec / 1000.0]})
- else
- record.merge!({@time_field => tmp_time.strftime('%FT%TZ')})
- end
+ rescue Exception => e
+ log.warn "An error occurred in parsing the time field: #{e.message}"
end
+ end
+ if @millisecond then
+ record.merge!({@time_field => '%s.%03dZ' % [tmp_time.strftime('%FT%T'), tmp_time.usec / 1000.0]})
else
- tmp_time = Time.at(time).utc
- if @millisecond then
- record.merge!({@time_field => '%s.%03dZ' % [tmp_time.strftime('%FT%T'), tmp_time.usec / 1000.0]})
- else
- record.merge!({@time_field => tmp_time.strftime('%FT%TZ')})
- end
+ record.merge!({@time_field => tmp_time.strftime('%FT%TZ')})
end
+ # Ignore undefined fields
if @ignore_undefined_fields then
record.each_key do |key|
- unless @fields.include?(key) then
+ unless fields.include?(key) then
record.delete(key)
end
end
end
- if @string_field_value_max_length >= 0 then
- record.each_key do |key|
- if record[key].instance_of?(Array) then
- values = []
- record[key].each do |value|
- if value.instance_of?(String) then
- if value.length > @string_field_value_max_length then
- log.warn "#{key} is too long (#{value.length}, max is #{@string_field_value_max_length})."
- values.push(value.slice(0, @string_field_value_max_length))
- else
- values.push(value)
- end
- end
- end
- record[key] = values
- elsif record[key].instance_of?(String) then
- if record[key].length > @string_field_value_max_length then
- log.warn "#{key} is too long (#{record[key].length}, max is #{@string_field_value_max_length})."
- record[key] = record[key].slice(0, @string_field_value_max_length)
- end
- end
- end
- end
-
- #
- # delete reserved fields
- # https://cwiki.apache.org/confluence/display/solr/Defining+Fields
- #
- record.each_key do |key|
- if key[0] == '_' and key[-1] == '_' then
- record.delete(key)
- end
- end
-
+ # Add record to documents
documents << record
+ # Update when flash size is reached
if documents.count >= @flush_size
update documents
documents.clear
end
end
+ # Update remaining documents
update documents unless documents.empty?
end
def update(documents)
- if @mode == MODE_STANDALONE then
- @solr.add documents, :params => {:commit => @commit_with_flush}
- log.debug "Added #{documents.count} document(s) to Solr"
- elsif @mode == MODE_SOLRCLOUD then
- @solr.add documents, collection: @collection, :params => {:commit => @commit_with_flush}
- log.debug "Added #{documents.count} document(s) to Solr"
+ begin
+ if @mode == MODE_STANDALONE then
+ @solr.add documents, :params => {:commit => @commit_with_flush}
+ elsif @mode == MODE_SOLRCLOUD then
+ @solr.add documents, collection: @collection, :params => {:commit => @commit_with_flush}
+ end
+ log.debug "Sent #{documents.count} document(s) to Solr"
+ rescue Exception
+ log.warn "An error occurred while sending #{documents.count} document(s) to Solr"
end
- rescue Exception => e
- log.warn "An error occurred while indexing"
end
def get_unique_key
- response = nil
+ unique_key = 'id'
- if @mode == MODE_STANDALONE then
- response = @solr.get 'schema/uniquekey'
- elsif @mode == MODE_SOLRCLOUD then
- response = @solr.get 'schema/uniquekey', collection: @collection
+ begin
+ response = nil
+ if @mode == MODE_STANDALONE then
+ response = @solr.get 'schema/uniquekey'
+ elsif @mode == MODE_SOLRCLOUD then
+ response = @solr.get 'schema/uniquekey', collection: @collection
+ end
+ unique_key = response['uniqueKey']
+ log.debug "Unique key: #{unique_key}"
+ rescue Exception
+ log.warn 'An error occurred while getting unique key'
end
- unique_key = response['uniqueKey']
- log.debug "Unique key: #{unique_key}"
-
return unique_key
-
- rescue Exception => e
- log.warn "An error occurred while getting unique key"
end
def get_fields
- response = nil
+ fields = []
- if @mode == MODE_STANDALONE then
- response = @solr.get 'schema/fields'
- elsif @mode == MODE_SOLRCLOUD then
- response = @solr.get 'schema/fields', collection: @collection
- end
+ begin
+ response = nil
- fields = []
- response['fields'].each do |field|
- fields.push(field['name'])
+ if @mode == MODE_STANDALONE then
+ response = @solr.get 'schema/fields'
+ elsif @mode == MODE_SOLRCLOUD then
+ response = @solr.get 'schema/fields', collection: @collection
+ end
+ response['fields'].each do |field|
+ fields.push(field['name'])
+ end
+ log.debug "Fields: #{fields}"
+ rescue Exception
+ log.warn 'An error occurred while getting fields'
end
- log.debug "Fields: #{fields}"
return fields
-
- rescue Exception => e
- log.warn "An error occurred while getting fields"
end
end
end