lib/fluent/plugin/out_solr.rb in fluent-plugin-output-solr-0.1.1 vs lib/fluent/plugin/out_solr.rb in fluent-plugin-output-solr-0.2.0
- old
+ new
@@ -5,37 +5,64 @@
module Fluent
class SolrOutput < BufferedOutput
Fluent::Plugin.register_output('solr', self)
+ DEFAULT_COLLECTION = 'collection1'
+ DEFAULT_IGNORE_UNDEFINED_FIELDS = false
+ DEFAULT_TIMESTAMP_FIELD = 'event_timestamp'
+ DEFAULT_FLUSH_SIZE = 100
+
+ MODE_STANDALONE = 'Standalone'
+ MODE_SOLRCLOUD = 'SolrCloud'
+
+ include Fluent::SetTagKeyMixin
+ config_set_default :include_tag_key, false
+
+ include Fluent::SetTimeKeyMixin
+ config_set_default :include_time_key, false
+
config_param :url, :string, :default => nil,
- :desc => 'The Solr server url (for example http://localhost:8983/solr/collection1).'
+ :desc => 'The Solr server url (for example http://localhost:8983/solr/collection1).'
config_param :zk_host, :string, :default => nil,
- :desc => 'The ZooKeeper connection string that SolrCloud refers to (for example localhost:2181/solr).'
- config_param :collection, :string, :default => 'collection1',
- :desc => 'The SolrCloud collection name.'
+ :desc => 'The ZooKeeper connection string that SolrCloud refers to (for example localhost:2181/solr).'
+ config_param :collection, :string, :default => DEFAULT_COLLECTION,
+ :desc => 'The SolrCloud collection name (default collection1).'
- config_param :batch_size, :integer, :default => 100,
- :desc => 'The batch size used in update.'
+ config_param :defined_fields, :array, :default => nil,
+ :desc => 'The defined fields in the Solr schema.xml. If omitted, it will get fields via Solr Schema API.'
+ config_param :ignore_undefined_fields, :bool, :default => DEFAULT_IGNORE_UNDEFINED_FIELDS,
+ :desc => 'Ignore undefined fields in the Solr schema.xml.'
- MODE_STANDALONE = 'Standalone'
- MODE_SOLRCLOUD = 'SolrCloud'
+ config_param :unique_key_field, :string, :default => nil,
+ :desc => 'A field name of unique key in the Solr schema.xml. If omitted, it will get unique key via Solr Schema API.'
+ config_param :timestamp_field, :string, :default => DEFAULT_TIMESTAMP_FIELD,
+ :desc => 'A field name of event timestamp in the Solr schema.xml (default event_timestamp).'
+ config_param :flush_size, :integer, :default => DEFAULT_FLUSH_SIZE,
+ :desc => 'A number of events to queue up before writing to Solr (default 100).'
+
def initialize
super
end
def configure(conf)
super
@url = conf['url']
@zk_host = conf['zk_host']
- @collection = conf['collection']
+ @collection = conf.has_key?('collection') ? conf['collection'] : DEFAULT_COLLECTION
- @batch_size = conf['batch_size'].to_i
+ @defined_fields = conf['defined_fields']
+ @ignore_undefined_field = conf.has_key?('ignore_undefined_field') ? conf['ignore_undefined_field'] : DEFAULT_IGNORE_UNDEFINED_FIELDS
+
+ @unique_key_field = conf['unique_key_field']
+ @timestamp_field = conf.has_key?('timestamp_field') ? conf['timestamp_field'] : DEFAULT_TIMESTAMP_FIELD
+
+ @flush_size = conf.has_key?('flush_size') ? conf['flush_size'].to_i : DEFAULT_FLUSH_SIZE
end
def start
super
@@ -71,23 +98,37 @@
end
def write(chunk)
documents = []
+ @fields = @defined_fields.nil? ? get_fields : @defined_fields
+ @unique_key = @unique_key_field.nil? ? get_unique_key : @unique_key_field
+
chunk.msgpack_each do |tag, time, record|
- unless record.has_key?('id') then
- record.merge!({'id' => SecureRandom.uuid})
+
+ unless record.has_key?(@unique_key) then
+ record.merge!({@unique_key => SecureRandom.uuid})
end
+ record.merge!({@timestamp_field => Time.at(time).utc.strftime('%FT%TZ')})
+
+ if @ignore_undefined_fields then
+ record.each_key do |key|
+ unless @fields.include?(key) then
+ record.delete(key)
+ end
+ end
+ end
+
documents << record
-
- if documents.count >= @batch_size
+
+ if documents.count >= @flush_size
update documents
documents.clear
end
end
-
+
update documents unless documents.empty?
end
def update(documents)
if @mode == MODE_STANDALONE then
@@ -95,8 +136,49 @@
log.info "Added %d document(s) to Solr" % documents.count
elsif @mode == MODE_SOLRCLOUD then
@solr.add documents, collection: @collection, :params => {:commit => true}
log.info "Added %d document(s) to Solr" % documents.count
end
+ rescue Exception => e
+ log.warn("An error occurred while indexing: #{e.message}")
+ end
+
+ def get_unique_key
+ response = nil
+
+ if @mode == MODE_STANDALONE then
+ response = @solr.get 'schema/uniquekey'
+ elsif @mode == MODE_SOLRCLOUD then
+ response = @solr.get 'schema/uniquekey', collection: @collection
+ end
+
+ unique_key = response['uniqueKey']
+ log.info ("Unique key: #{unique_key}")
+
+ return unique_key
+
+ rescue Exception => e
+ log.warn("An error occurred while indexing: #{e.message}")
+ end
+
+ def get_fields
+ response = nil
+
+ if @mode == MODE_STANDALONE then
+ response = @solr.get 'schema/fields'
+ elsif @mode == MODE_SOLRCLOUD then
+ response = @solr.get 'schema/fields', collection: @collection
+ end
+
+ fields = []
+ response['fields'].each do |field|
+ fields.push(field['name'])
+ end
+ log.info ("Fields: #{fields}")
+
+ return fields
+
+ rescue Exception => e
+ log.warn("An error occurred while indexing: #{e.message}")
end
end
end