lib/searchkick/index.rb in searchkick-4.6.3 vs lib/searchkick/index.rb in searchkick-5.0.0

- old
+ new

@@ -1,7 +1,5 @@ -require "searchkick/index_options" - module Searchkick class Index attr_reader :name, :options def initialize(name, options = {}) @@ -38,16 +36,19 @@ def alias_exists? client.indices.exists_alias name: name end + # call to_h for consistent results between elasticsearch gem 7 and 8 + # could do for all API calls, but just do for ones where return value is focus for now def mapping - client.indices.get_mapping index: name + client.indices.get_mapping(index: name).to_h end + # call to_h for consistent results between elasticsearch gem 7 and 8 def settings - client.indices.get_settings index: name + client.indices.get_settings(index: name).to_h end def refresh_interval index_settings["refresh_interval"] end @@ -95,11 +96,11 @@ def retrieve(record) record_data = RecordData.new(self, record).record_data # remove underscore - get_options = Hash[record_data.map { |k, v| [k.to_s.sub(/\A_/, "").to_sym, v] }] + get_options = record_data.to_h { |k, v| [k.to_s.sub(/\A_/, "").to_sym, v] } client.get(get_options)["_source"] end def all_indices(unaliased: false) @@ -125,71 +126,79 @@ Searchkick::Index.new(index).delete end indices end - # record based - # use helpers for notifications - def store(record) - bulk_indexer.bulk_index([record]) + notify(record, "Store") do + queue_index([record]) + end end def remove(record) - bulk_indexer.bulk_delete([record]) + notify(record, "Remove") do + queue_delete([record]) + end end def update_record(record, method_name) - bulk_indexer.bulk_update([record], method_name) + notify(record, "Update") do + queue_update([record], method_name) + end end def bulk_delete(records) - bulk_indexer.bulk_delete(records) + return if records.empty? + + notify_bulk(records, "Delete") do + queue_delete(records) + end end def bulk_index(records) - bulk_indexer.bulk_index(records) + return if records.empty? + + notify_bulk(records, "Import") do + queue_index(records) + end end alias_method :import, :bulk_index def bulk_update(records, method_name) - bulk_indexer.bulk_update(records, method_name) + return if records.empty? + + notify_bulk(records, "Update") do + queue_update(records, method_name) + end end def search_id(record) RecordData.new(self, record).search_id end def document_type(record) RecordData.new(self, record).document_type end - # TODO use like: [{_index: ..., _id: ...}] in Searchkick 5 def similar_record(record, **options) - like_text = retrieve(record).to_hash - .keep_if { |k, _| !options[:fields] || options[:fields].map(&:to_s).include?(k) } - .values.compact.join(" ") - - options[:where] ||= {} - options[:where][:_id] ||= {} - options[:where][:_id][:not] = Array(options[:where][:_id][:not]) + [record.id.to_s] options[:per_page] ||= 10 - options[:similar] = true + options[:similar] = [RecordData.new(self, record).record_data] + options[:models] ||= [record.class] unless options.key?(:model) - # TODO use index class instead of record class - Searchkick.search(like_text, model: record.class, **options) + Searchkick.search("*", **options) end def reload_synonyms if Searchkick.opensearch? client.transport.perform_request "POST", "_plugins/_refresh_search_analyzers/#{CGI.escape(name)}" else raise Error, "Requires Elasticsearch 7.3+" if Searchkick.server_below?("7.3.0") begin client.transport.perform_request("GET", "#{CGI.escape(name)}/_reload_search_analyzers") - rescue Elasticsearch::Transport::Transport::Errors::MethodNotAllowed - raise Error, "Requires non-OSS version of Elasticsearch" + rescue => e + raise Error, "Requires non-OSS version of Elasticsearch" if Searchkick.not_allowed_error?(e) + raise e end end end # queue @@ -198,33 +207,39 @@ Searchkick::ReindexQueue.new(name) end # reindex - def reindex(relation, method_name, scoped:, full: false, scope: nil, **options) + # note: this is designed to be used internally + # so it does not check object matches index class + def reindex(object, method_name: nil, full: false, **options) + if object.is_a?(Array) + # note: purposefully skip full + return reindex_records(object, method_name: method_name, **options) + end + + if !object.respond_to?(:searchkick_klass) + raise Error, "Cannot reindex object" + end + + scoped = Searchkick.relation?(object) + # call searchkick_klass for inheritance + relation = scoped ? object.all : Searchkick.scope(object.searchkick_klass).all + refresh = options.fetch(:refresh, !scoped) options.delete(:refresh) - if method_name - # TODO throw ArgumentError - Searchkick.warn("unsupported keywords: #{options.keys.map(&:inspect).join(", ")}") if options.any? + if method_name || (scoped && !full) + mode = options.delete(:mode) || :inline + raise ArgumentError, "unsupported keywords: #{options.keys.map(&:inspect).join(", ")}" if options.any? - # update - import_scope(relation, method_name: method_name, scope: scope) + # import only + import_scope(relation, method_name: method_name, mode: mode) self.refresh if refresh true - elsif scoped && !full - # TODO throw ArgumentError - Searchkick.warn("unsupported keywords: #{options.keys.map(&:inspect).join(", ")}") if options.any? - - # reindex association - import_scope(relation, scope: scope) - self.refresh if refresh - true else - # full reindex - reindex_scope(relation, scope: scope, **options) + full_reindex(relation, **options) end end def create_index(index_options: nil) index_options ||= self.index_options @@ -232,19 +247,18 @@ index.create(index_options) index end def import_scope(relation, **options) - bulk_indexer.import_scope(relation, **options) + relation_indexer.reindex(relation, **options) end def batches_left - bulk_indexer.batches_left + relation_indexer.batches_left end - # other - + # private def klass_document_type(klass, ignore_type = false) @klass_document_type[[klass, ignore_type]] ||= begin if !ignore_type && klass.searchkick_klass.searchkick_options[:_type] type = klass.searchkick_klass.searchkick_options[:_type] type = type.call if type.respond_to?(:call) @@ -253,22 +267,24 @@ klass.model_name.to_s.underscore end end end - # should not be public + # private def conversions_fields @conversions_fields ||= begin conversions = Array(options[:conversions]) conversions.map(&:to_s) + conversions.map(&:to_sym) end end + # private def suggest_fields @suggest_fields ||= Array(options[:suggest]).map(&:to_s) end + # private def locations_fields @locations_fields ||= begin locations = Array(options[:locations]) locations.map(&:to_s) + locations.map(&:to_sym) end @@ -283,25 +299,47 @@ def client Searchkick.client end - def bulk_indexer - @bulk_indexer ||= BulkIndexer.new(self) + def queue_index(records) + Searchkick.indexer.queue(records.map { |r| RecordData.new(self, r).index_data }) end + def queue_delete(records) + Searchkick.indexer.queue(records.reject { |r| r.id.blank? }.map { |r| RecordData.new(self, r).delete_data }) + end + + def queue_update(records, method_name) + Searchkick.indexer.queue(records.map { |r| RecordData.new(self, r).update_data(method_name) }) + end + + def relation_indexer + @relation_indexer ||= RelationIndexer.new(self) + end + def index_settings settings.values.first["settings"]["index"] end def import_before_promotion(index, relation, **import_options) index.import_scope(relation, **import_options) end + def reindex_records(object, mode: nil, refresh: false, **options) + mode ||= Searchkick.callbacks_value || @options[:callbacks] || true + mode = :inline if mode == :bulk + + result = RecordIndexer.new(self).reindex(object, mode: mode, full: false, **options) + self.refresh if refresh + result + end + # https://gist.github.com/jarosan/3124884 # http://www.elasticsearch.org/blog/changing-mapping-with-zero-downtime/ - def reindex_scope(relation, import: true, resume: false, retain: false, async: false, refresh_interval: nil, scope: nil) + # TODO deprecate async in favor of mode: :async, wait: true/false + def full_reindex(relation, import: true, resume: false, retain: false, async: false, refresh_interval: nil, scope: nil) if resume index_name = all_indices.sort.last raise Searchkick::Error, "No index to resume" unless index_name index = Searchkick::Index.new(index_name, @options) else @@ -311,13 +349,13 @@ index_options.deep_merge!(settings: {index: {refresh_interval: refresh_interval}}) if refresh_interval index = create_index(index_options: index_options) end import_options = { - resume: resume, - async: async, + mode: (async ? :async : :inline), full: true, + resume: resume, scope: scope } uuid = index.uuid @@ -365,11 +403,11 @@ index.refresh true end rescue => e if Searchkick.transport_error?(e) && e.message.include?("No handler for type [text]") - raise UnsupportedVersionError, "This version of Searchkick requires Elasticsearch 6 or greater" + raise UnsupportedVersionError end raise e end @@ -378,9 +416,38 @@ # ideal is for user to disable automatic index creation # https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#index-creation def check_uuid(old_uuid, new_uuid) if old_uuid != new_uuid raise Searchkick::Error, "Safety check failed - only run one Model.reindex per model at a time" + end + end + + def notify(record, name) + if Searchkick.callbacks_value == :bulk + yield + else + name = "#{record.class.searchkick_klass.name} #{name}" if record && record.class.searchkick_klass + event = { + name: name, + id: search_id(record) + } + ActiveSupport::Notifications.instrument("request.searchkick", event) do + yield + end + end + end + + def notify_bulk(records, name) + if Searchkick.callbacks_value == :bulk + yield + else + event = { + name: "#{records.first.class.searchkick_klass.name} #{name}", + count: records.size + } + ActiveSupport::Notifications.instrument("request.searchkick", event) do + yield + end end end end end