lib/esse/index/documents.rb in esse-0.2.6 vs lib/esse/index/documents.rb in esse-0.3.0

- old
+ new

@@ -162,23 +162,24 @@ # @return [Array<Esse::Import::RequestBody>] The list of request bodies. @TODO Change this to a Stats object # # @see https://www.elastic.co/guide/en/elasticsearch/reference/7.5/docs-bulk.html # @see https://github.com/elastic/elasticsearch-ruby/blob/main/elasticsearch-api/lib/elasticsearch/api/utils.rb # @see https://github.com/elastic/elasticsearch-ruby/blob/main/elasticsearch-api/lib/elasticsearch/api/actions/bulk.rb - def bulk(index: nil, delete: nil, create: nil, type: nil, suffix: nil, **options) + def bulk(create: nil, delete: nil, index: nil, update: nil, type: nil, suffix: nil, **options) definition = { index: index_name(suffix: suffix), type: type, }.merge(options) cluster.may_update_type!(definition) # @TODO Wrap the return in a some other Stats object with more information Esse::Import::Bulk.new( **definition.slice(:type), - index: index, - delete: delete, create: create, + delete: delete, + index: index, + update: update, ).each_request do |request_body| cluster.api.bulk(**definition, body: request_body.body) do |event_payload| event_payload[:body_stats] = request_body.stats if bulk_wait_interval > 0 event_payload[:wait_interval] = bulk_wait_interval @@ -196,23 +197,56 @@ # @param options [Hash] Hash of paramenters that will be passed along to elasticsearch request # @option [String, nil] :suffix The index suffix. Defaults to the nil. # @option [Hash] :context The collection context. This value will be passed as argument to the collection # May be SQL condition or any other filter you have defined on the collection. # @return [Numeric] The number of documents imported - def import(*repo_types, context: {}, suffix: nil, **options) + def import(*repo_types, context: {}, eager_include_document_attributes: false, lazy_update_document_attributes: false, suffix: nil, **options) repo_types = repo_hash.keys if repo_types.empty? count = 0 + repo_hash.slice(*repo_types).each do |repo_name, repo| + doc_attrs = {eager: [], lazy: []} + if (expected = eager_include_document_attributes) != false + allowed = repo.lazy_document_attributes.keys + doc_attrs[:eager] = (expected == true) ? allowed : Array(expected).map(&:to_s) & allowed + end + if (expected = lazy_update_document_attributes) != false + allowed = repo.lazy_document_attributes.keys + doc_attrs[:lazy] = (expected == true) ? allowed : Array(expected).map(&:to_s) & allowed + doc_attrs[:lazy] -= doc_attrs[:eager] + end + repo.each_serialized_batch(**(context || {})) do |batch| # Elasticsearch 6.x and older have multiple types per index. # This gem supports multiple types per index for backward compatibility, but we recommend to update # your elasticsearch to a at least 7.x version and use a single type per index. # # Note that the repository name will be used as the document type. # mapping_default_type - kwargs = { index: batch, suffix: suffix, type: repo_name, **options } + kwargs = { suffix: suffix, type: repo_name, **options } cluster.may_update_type!(kwargs) - bulk(**kwargs) + + doc_attrs[:eager].each do |attr_name| + partial_docs = repo.documents_for_lazy_attribute(attr_name, *batch.reject(&:ignore_on_index?)) + next if partial_docs.empty? + + partial_docs.each do |part_doc| + doc = batch.find { |d| part_doc.id == d.id && part_doc.type == d.type && part_doc.routing == d.routing } + next unless doc + + doc.send(:__add_lazy_data_to_source__, part_doc.source) + end + end + + bulk(**kwargs, index: batch) + + doc_attrs[:lazy].each do |attr_name| + partial_docs = repo.documents_for_lazy_attribute(attr_name, *batch.reject(&:ignore_on_index?)) + next if partial_docs.empty? + + bulk(**kwargs, update: partial_docs) + end + count += batch.size end end count end