lib/esse/index/documents.rb in esse-0.2.6 vs lib/esse/index/documents.rb in esse-0.3.0
- old
+ new
@@ -162,23 +162,24 @@
# @return [Array<Esse::Import::RequestBody>] The list of request bodies. @TODO Change this to a Stats object
#
# @see https://www.elastic.co/guide/en/elasticsearch/reference/7.5/docs-bulk.html
# @see https://github.com/elastic/elasticsearch-ruby/blob/main/elasticsearch-api/lib/elasticsearch/api/utils.rb
# @see https://github.com/elastic/elasticsearch-ruby/blob/main/elasticsearch-api/lib/elasticsearch/api/actions/bulk.rb
- def bulk(index: nil, delete: nil, create: nil, type: nil, suffix: nil, **options)
+ def bulk(create: nil, delete: nil, index: nil, update: nil, type: nil, suffix: nil, **options)
definition = {
index: index_name(suffix: suffix),
type: type,
}.merge(options)
cluster.may_update_type!(definition)
# @TODO Wrap the return in a some other Stats object with more information
Esse::Import::Bulk.new(
**definition.slice(:type),
- index: index,
- delete: delete,
create: create,
+ delete: delete,
+ index: index,
+ update: update,
).each_request do |request_body|
cluster.api.bulk(**definition, body: request_body.body) do |event_payload|
event_payload[:body_stats] = request_body.stats
if bulk_wait_interval > 0
event_payload[:wait_interval] = bulk_wait_interval
@@ -196,23 +197,56 @@
# @param options [Hash] Hash of paramenters that will be passed along to elasticsearch request
# @option [String, nil] :suffix The index suffix. Defaults to the nil.
# @option [Hash] :context The collection context. This value will be passed as argument to the collection
# May be SQL condition or any other filter you have defined on the collection.
# @return [Numeric] The number of documents imported
- def import(*repo_types, context: {}, suffix: nil, **options)
+ def import(*repo_types, context: {}, eager_include_document_attributes: false, lazy_update_document_attributes: false, suffix: nil, **options)
repo_types = repo_hash.keys if repo_types.empty?
count = 0
+
repo_hash.slice(*repo_types).each do |repo_name, repo|
+ doc_attrs = {eager: [], lazy: []}
+ if (expected = eager_include_document_attributes) != false
+ allowed = repo.lazy_document_attributes.keys
+ doc_attrs[:eager] = (expected == true) ? allowed : Array(expected).map(&:to_s) & allowed
+ end
+ if (expected = lazy_update_document_attributes) != false
+ allowed = repo.lazy_document_attributes.keys
+ doc_attrs[:lazy] = (expected == true) ? allowed : Array(expected).map(&:to_s) & allowed
+ doc_attrs[:lazy] -= doc_attrs[:eager]
+ end
+
repo.each_serialized_batch(**(context || {})) do |batch|
# Elasticsearch 6.x and older have multiple types per index.
# This gem supports multiple types per index for backward compatibility, but we recommend to update
# your elasticsearch to a at least 7.x version and use a single type per index.
#
# Note that the repository name will be used as the document type.
# mapping_default_type
- kwargs = { index: batch, suffix: suffix, type: repo_name, **options }
+ kwargs = { suffix: suffix, type: repo_name, **options }
cluster.may_update_type!(kwargs)
- bulk(**kwargs)
+
+ doc_attrs[:eager].each do |attr_name|
+ partial_docs = repo.documents_for_lazy_attribute(attr_name, *batch.reject(&:ignore_on_index?))
+ next if partial_docs.empty?
+
+ partial_docs.each do |part_doc|
+ doc = batch.find { |d| part_doc.id == d.id && part_doc.type == d.type && part_doc.routing == d.routing }
+ next unless doc
+
+ doc.send(:__add_lazy_data_to_source__, part_doc.source)
+ end
+ end
+
+ bulk(**kwargs, index: batch)
+
+ doc_attrs[:lazy].each do |attr_name|
+ partial_docs = repo.documents_for_lazy_attribute(attr_name, *batch.reject(&:ignore_on_index?))
+ next if partial_docs.empty?
+
+ bulk(**kwargs, update: partial_docs)
+ end
+
count += batch.size
end
end
count
end