lib/esse/index/documents.rb in esse-0.3.5 vs lib/esse/index/documents.rb in esse-0.4.0.rc1
- old
+ new
@@ -112,11 +112,11 @@
#
# @see https://www.elastic.co/guide/en/elasticsearch/reference/7.5/docs-update.html
def update(doc = nil, suffix: nil, **options)
if document?(doc)
options[:id] = doc.id
- options[:body] = { doc: doc.source }
+ options[:body] = { doc: doc.mutated_source }
options[:type] = doc.type if doc.type?
options[:routing] = doc.routing if doc.routing?
end
require_kwargs!(options, :id, :body)
options[:index] = index_name(suffix: suffix)
@@ -138,11 +138,11 @@
#
# @see https://www.elastic.co/guide/en/elasticsearch/reference/7.5/docs-index_.html
def index(doc = nil, suffix: nil, **options)
if document?(doc)
options[:id] = doc.id
- options[:body] = doc.source
+ options[:body] = doc.mutated_source
options[:type] = doc.type if doc.type?
options[:routing] = doc.routing if doc.routing?
end
require_kwargs!(options, :id, :body)
options[:index] = index_name(suffix: suffix)
@@ -169,17 +169,57 @@
index: index_name(suffix: suffix),
type: type,
}.merge(options)
cluster.may_update_type!(definition)
+ to_index = []
+ to_create = []
+ to_update = []
+ to_delete = []
+ Esse::ArrayUtils.wrap(index).each do |doc|
+ if doc.is_a?(Hash)
+ to_index << doc
+ elsif Esse.document?(doc) && !doc.ignore_on_index?
+ hash = doc.to_bulk
+ hash[:_type] ||= type if type
+ to_index << hash
+ end
+ end
+ Esse::ArrayUtils.wrap(create).each do |doc|
+ if doc.is_a?(Hash)
+ to_create << doc
+ elsif Esse.document?(doc) && !doc.ignore_on_index?
+ hash = doc.to_bulk
+ hash[:_type] ||= type if type
+ to_create << hash
+ end
+ end
+ Esse::ArrayUtils.wrap(update).each do |doc|
+ if doc.is_a?(Hash)
+ to_update << doc
+ elsif Esse.document?(doc) && !doc.ignore_on_index?
+ hash = doc.to_bulk(operation: :update)
+ hash[:_type] ||= type if type
+ to_update << hash
+ end
+ end
+ Esse::ArrayUtils.wrap(delete).each do |doc|
+ if doc.is_a?(Hash)
+ to_delete << doc
+ elsif Esse.document?(doc) && !doc.ignore_on_delete?
+ hash = doc.to_bulk(data: false)
+ hash[:_type] ||= type if type
+ to_delete << hash
+ end
+ end
+
# @TODO Wrap the return in a some other Stats object with more information
Esse::Import::Bulk.new(
- **definition.slice(:type),
- create: create,
- delete: delete,
- index: index,
- update: update,
+ create: to_create,
+ delete: to_delete,
+ index: to_index,
+ update: to_update,
).each_request do |request_body|
cluster.api.bulk(**definition, body: request_body.body) do |event_payload|
event_payload[:body_stats] = request_body.stats
if bulk_wait_interval > 0
event_payload[:wait_interval] = bulk_wait_interval
@@ -196,39 +236,64 @@
# @param repos [Array<String>] List of repo types. Defaults to all types.
# @param options [Hash] Hash of paramenters that will be passed along to elasticsearch request
# @option [String, nil] :suffix The index suffix. Defaults to the nil.
# @option [Hash] :context The collection context. This value will be passed as argument to the collection
# May be SQL condition or any other filter you have defined on the collection.
+ # @option [Boolean, Array<String>] :eager_load_lazy_attributes A list of lazy document attributes to include to the bulk index request.
+ # Or pass `true` to include all lazy attributes.
+ # @option [Boolean, Array<String>] :update_lazy_attributes A list of lazy document attributes to bulk update each after the bulk import.
+ # Or pass `true` to update all lazy attributes.
+ # @option [Boolean, Array<String>] :preload_lazy_attributes A list of lazy document attributes to preload using search API before the bulk import.
+ # Or pass `true` to preload all lazy attributes.
# @return [Numeric] The number of documents imported
- def import(*repo_types, context: {}, eager_include_document_attributes: false, lazy_update_document_attributes: false, suffix: nil, **options)
+ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_lazy_attributes: false, preload_lazy_attributes: false, suffix: nil, **options)
repo_types = repo_hash.keys if repo_types.empty?
count = 0
+ if options.key?(:eager_include_document_attributes)
+ warn 'The `eager_include_document_attributes` option is deprecated. Use `eager_load_lazy_attributes` instead.'
+ eager_load_lazy_attributes = options.delete(:eager_include_document_attributes)
+ end
+ if options.key?(:lazy_update_document_attributes)
+ warn 'The `lazy_update_document_attributes` option is deprecated. Use `update_lazy_attributes` instead.'
+ update_lazy_attributes = options.delete(:lazy_update_document_attributes)
+ end
+
repo_hash.slice(*repo_types).each do |repo_name, repo|
- doc_attrs = {eager: [], lazy: []}
- doc_attrs[:eager] = repo.lazy_document_attribute_names(eager_include_document_attributes)
- doc_attrs[:lazy] = repo.lazy_document_attribute_names(lazy_update_document_attributes)
- doc_attrs[:lazy] -= doc_attrs[:eager]
+ # Elasticsearch 6.x and older have multiple types per index.
+ # This gem supports multiple types per index for backward compatibility, but we recommend to update
+ # your elasticsearch to a at least 7.x version and use a single type per index.
+ #
+ # Note that the repository name will be used as the document type.
+ # mapping_default_type
+ bulk_kwargs = { suffix: suffix, type: repo_name, **options }
+ cluster.may_update_type!(bulk_kwargs)
context ||= {}
- context[:lazy_attributes] = doc_attrs[:eager] if doc_attrs[:eager].any?
+ context[:eager_load_lazy_attributes] = eager_load_lazy_attributes
+ context[:preload_lazy_attributes] = preload_lazy_attributes
repo.each_serialized_batch(**context) do |batch|
- # Elasticsearch 6.x and older have multiple types per index.
- # This gem supports multiple types per index for backward compatibility, but we recommend to update
- # your elasticsearch to a at least 7.x version and use a single type per index.
- #
- # Note that the repository name will be used as the document type.
- # mapping_default_type
- kwargs = { suffix: suffix, type: repo_name, **options }
- cluster.may_update_type!(kwargs)
+ bulk(**bulk_kwargs, index: batch)
- bulk(**kwargs, index: batch)
+ if update_lazy_attributes != false
+ attrs = repo.lazy_document_attribute_names(update_lazy_attributes)
+ attrs -= repo.lazy_document_attribute_names(eager_load_lazy_attributes)
+ update_attrs = attrs.each_with_object(Hash.new { |h, k| h[k] = {} }) do |attr_name, memo|
+ filtered_docs = batch.reject do |doc|
+ doc.ignore_on_index? || doc.mutations.key?(attr_name)
+ end
+ next if filtered_docs.empty?
- doc_attrs[:lazy].each do |attr_name|
- partial_docs = repo.documents_for_lazy_attribute(attr_name, batch.reject(&:ignore_on_index?))
- next if partial_docs.empty?
-
- bulk(**kwargs, update: partial_docs)
+ repo.retrieve_lazy_attribute_values(attr_name, filtered_docs).each do |doc, value|
+ memo[doc.doc_header][attr_name] = value
+ end
+ end
+ if update_attrs.any?
+ bulk_update = update_attrs.map do |header, values|
+ header.merge(data: {doc: values})
+ end
+ bulk(**bulk_kwargs, update: bulk_update)
+ end
end
count += batch.size
end
end