lib/esse/index/documents.rb in esse-0.3.5 vs lib/esse/index/documents.rb in esse-0.4.0.rc1

- old
+ new

@@ -112,11 +112,11 @@ # # @see https://www.elastic.co/guide/en/elasticsearch/reference/7.5/docs-update.html def update(doc = nil, suffix: nil, **options) if document?(doc) options[:id] = doc.id - options[:body] = { doc: doc.source } + options[:body] = { doc: doc.mutated_source } options[:type] = doc.type if doc.type? options[:routing] = doc.routing if doc.routing? end require_kwargs!(options, :id, :body) options[:index] = index_name(suffix: suffix) @@ -138,11 +138,11 @@ # # @see https://www.elastic.co/guide/en/elasticsearch/reference/7.5/docs-index_.html def index(doc = nil, suffix: nil, **options) if document?(doc) options[:id] = doc.id - options[:body] = doc.source + options[:body] = doc.mutated_source options[:type] = doc.type if doc.type? options[:routing] = doc.routing if doc.routing? end require_kwargs!(options, :id, :body) options[:index] = index_name(suffix: suffix) @@ -169,17 +169,57 @@ index: index_name(suffix: suffix), type: type, }.merge(options) cluster.may_update_type!(definition) + to_index = [] + to_create = [] + to_update = [] + to_delete = [] + Esse::ArrayUtils.wrap(index).each do |doc| + if doc.is_a?(Hash) + to_index << doc + elsif Esse.document?(doc) && !doc.ignore_on_index? + hash = doc.to_bulk + hash[:_type] ||= type if type + to_index << hash + end + end + Esse::ArrayUtils.wrap(create).each do |doc| + if doc.is_a?(Hash) + to_create << doc + elsif Esse.document?(doc) && !doc.ignore_on_index? + hash = doc.to_bulk + hash[:_type] ||= type if type + to_create << hash + end + end + Esse::ArrayUtils.wrap(update).each do |doc| + if doc.is_a?(Hash) + to_update << doc + elsif Esse.document?(doc) && !doc.ignore_on_index? + hash = doc.to_bulk(operation: :update) + hash[:_type] ||= type if type + to_update << hash + end + end + Esse::ArrayUtils.wrap(delete).each do |doc| + if doc.is_a?(Hash) + to_delete << doc + elsif Esse.document?(doc) && !doc.ignore_on_delete? + hash = doc.to_bulk(data: false) + hash[:_type] ||= type if type + to_delete << hash + end + end + # @TODO Wrap the return in a some other Stats object with more information Esse::Import::Bulk.new( - **definition.slice(:type), - create: create, - delete: delete, - index: index, - update: update, + create: to_create, + delete: to_delete, + index: to_index, + update: to_update, ).each_request do |request_body| cluster.api.bulk(**definition, body: request_body.body) do |event_payload| event_payload[:body_stats] = request_body.stats if bulk_wait_interval > 0 event_payload[:wait_interval] = bulk_wait_interval @@ -196,39 +236,64 @@ # @param repos [Array<String>] List of repo types. Defaults to all types. # @param options [Hash] Hash of paramenters that will be passed along to elasticsearch request # @option [String, nil] :suffix The index suffix. Defaults to the nil. # @option [Hash] :context The collection context. This value will be passed as argument to the collection # May be SQL condition or any other filter you have defined on the collection. + # @option [Boolean, Array<String>] :eager_load_lazy_attributes A list of lazy document attributes to include to the bulk index request. + # Or pass `true` to include all lazy attributes. + # @option [Boolean, Array<String>] :update_lazy_attributes A list of lazy document attributes to bulk update each after the bulk import. + # Or pass `true` to update all lazy attributes. + # @option [Boolean, Array<String>] :preload_lazy_attributes A list of lazy document attributes to preload using search API before the bulk import. + # Or pass `true` to preload all lazy attributes. # @return [Numeric] The number of documents imported - def import(*repo_types, context: {}, eager_include_document_attributes: false, lazy_update_document_attributes: false, suffix: nil, **options) + def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_lazy_attributes: false, preload_lazy_attributes: false, suffix: nil, **options) repo_types = repo_hash.keys if repo_types.empty? count = 0 + if options.key?(:eager_include_document_attributes) + warn 'The `eager_include_document_attributes` option is deprecated. Use `eager_load_lazy_attributes` instead.' + eager_load_lazy_attributes = options.delete(:eager_include_document_attributes) + end + if options.key?(:lazy_update_document_attributes) + warn 'The `lazy_update_document_attributes` option is deprecated. Use `update_lazy_attributes` instead.' + update_lazy_attributes = options.delete(:lazy_update_document_attributes) + end + repo_hash.slice(*repo_types).each do |repo_name, repo| - doc_attrs = {eager: [], lazy: []} - doc_attrs[:eager] = repo.lazy_document_attribute_names(eager_include_document_attributes) - doc_attrs[:lazy] = repo.lazy_document_attribute_names(lazy_update_document_attributes) - doc_attrs[:lazy] -= doc_attrs[:eager] + # Elasticsearch 6.x and older have multiple types per index. + # This gem supports multiple types per index for backward compatibility, but we recommend to update + # your elasticsearch to a at least 7.x version and use a single type per index. + # + # Note that the repository name will be used as the document type. + # mapping_default_type + bulk_kwargs = { suffix: suffix, type: repo_name, **options } + cluster.may_update_type!(bulk_kwargs) context ||= {} - context[:lazy_attributes] = doc_attrs[:eager] if doc_attrs[:eager].any? + context[:eager_load_lazy_attributes] = eager_load_lazy_attributes + context[:preload_lazy_attributes] = preload_lazy_attributes repo.each_serialized_batch(**context) do |batch| - # Elasticsearch 6.x and older have multiple types per index. - # This gem supports multiple types per index for backward compatibility, but we recommend to update - # your elasticsearch to a at least 7.x version and use a single type per index. - # - # Note that the repository name will be used as the document type. - # mapping_default_type - kwargs = { suffix: suffix, type: repo_name, **options } - cluster.may_update_type!(kwargs) + bulk(**bulk_kwargs, index: batch) - bulk(**kwargs, index: batch) + if update_lazy_attributes != false + attrs = repo.lazy_document_attribute_names(update_lazy_attributes) + attrs -= repo.lazy_document_attribute_names(eager_load_lazy_attributes) + update_attrs = attrs.each_with_object(Hash.new { |h, k| h[k] = {} }) do |attr_name, memo| + filtered_docs = batch.reject do |doc| + doc.ignore_on_index? || doc.mutations.key?(attr_name) + end + next if filtered_docs.empty? - doc_attrs[:lazy].each do |attr_name| - partial_docs = repo.documents_for_lazy_attribute(attr_name, batch.reject(&:ignore_on_index?)) - next if partial_docs.empty? - - bulk(**kwargs, update: partial_docs) + repo.retrieve_lazy_attribute_values(attr_name, filtered_docs).each do |doc, value| + memo[doc.doc_header][attr_name] = value + end + end + if update_attrs.any? + bulk_update = update_attrs.map do |header, values| + header.merge(data: {doc: values}) + end + bulk(**bulk_kwargs, update: bulk_update) + end end count += batch.size end end