module Searchkick class Index attr_reader :name, :options def initialize(name, options = {}) @name = name @options = options @klass_document_type = {} # cache end def index_options IndexOptions.new(self).index_options end def create(body = {}) client.indices.create index: name, body: body end def delete if alias_exists? # can't call delete directly on aliases in ES 6 indices = client.indices.get_alias(name: name).keys client.indices.delete index: indices else client.indices.delete index: name end end def exists? client.indices.exists index: name end def refresh client.indices.refresh index: name end def alias_exists? client.indices.exists_alias name: name end # call to_h for consistent results between elasticsearch gem 7 and 8 # could do for all API calls, but just do for ones where return value is focus for now def mapping client.indices.get_mapping(index: name).to_h end # call to_h for consistent results between elasticsearch gem 7 and 8 def settings client.indices.get_settings(index: name).to_h end def refresh_interval index_settings["refresh_interval"] end def update_settings(settings) client.indices.put_settings index: name, body: settings end def tokens(text, options = {}) client.indices.analyze(body: {text: text}.merge(options), index: name)["tokens"].map { |t| t["token"] } end def total_docs response = client.search( index: name, body: { query: {match_all: {}}, size: 0 } ) Searchkick::Results.new(nil, response).total_count end def promote(new_name, update_refresh_interval: false) if update_refresh_interval new_index = Searchkick::Index.new(new_name, @options) settings = options[:settings] || {} refresh_interval = (settings[:index] && settings[:index][:refresh_interval]) || "1s" new_index.update_settings(index: {refresh_interval: refresh_interval}) end old_indices = begin client.indices.get_alias(name: name).keys rescue => e raise e unless Searchkick.not_found_error?(e) {} end actions = old_indices.map { |old_name| {remove: {index: old_name, alias: name}} } + [{add: {index: new_name, alias: name}}] client.indices.update_aliases body: {actions: actions} end alias_method :swap, :promote def retrieve(record) record_data = RecordData.new(self, record).record_data # remove underscore get_options = record_data.to_h { |k, v| [k.to_s.sub(/\A_/, "").to_sym, v] } client.get(get_options)["_source"] end def all_indices(unaliased: false) indices = begin if client.indices.respond_to?(:get_alias) client.indices.get_alias(index: "#{name}*") else client.indices.get_aliases end rescue => e raise e unless Searchkick.not_found_error?(e) {} end indices = indices.select { |_k, v| v.empty? || v["aliases"].empty? } if unaliased indices.select { |k, _v| k =~ /\A#{Regexp.escape(name)}_\d{14,17}\z/ }.keys end # remove old indices that start w/ index_name def clean_indices indices = all_indices(unaliased: true) indices.each do |index| Searchkick::Index.new(index).delete end indices end def store(record) notify(record, "Store") do queue_index([record]) end end def remove(record) notify(record, "Remove") do queue_delete([record]) end end def update_record(record, method_name) notify(record, "Update") do queue_update([record], method_name) end end def bulk_delete(records) return if records.empty? notify_bulk(records, "Delete") do queue_delete(records) end end def bulk_index(records) return if records.empty? notify_bulk(records, "Import") do queue_index(records) end end alias_method :import, :bulk_index def bulk_update(records, method_name) return if records.empty? notify_bulk(records, "Update") do queue_update(records, method_name) end end def search_id(record) RecordData.new(self, record).search_id end def document_type(record) RecordData.new(self, record).document_type end def similar_record(record, **options) options[:per_page] ||= 10 options[:similar] = [RecordData.new(self, record).record_data] options[:models] ||= [record.class] unless options.key?(:model) Searchkick.search("*", **options) end def reload_synonyms if Searchkick.opensearch? client.transport.perform_request "POST", "_plugins/_refresh_search_analyzers/#{CGI.escape(name)}" else raise Error, "Requires Elasticsearch 7.3+" if Searchkick.server_below?("7.3.0") begin client.transport.perform_request("GET", "#{CGI.escape(name)}/_reload_search_analyzers") rescue => e raise Error, "Requires non-OSS version of Elasticsearch" if Searchkick.not_allowed_error?(e) raise e end end end # queue def reindex_queue Searchkick::ReindexQueue.new(name) end # reindex # note: this is designed to be used internally # so it does not check object matches index class def reindex(object, method_name: nil, full: false, **options) if object.is_a?(Array) # note: purposefully skip full return reindex_records(object, method_name: method_name, **options) end if !object.respond_to?(:searchkick_klass) raise Error, "Cannot reindex object" end scoped = Searchkick.relation?(object) # call searchkick_klass for inheritance relation = scoped ? object.all : Searchkick.scope(object.searchkick_klass).all refresh = options.fetch(:refresh, !scoped) options.delete(:refresh) if method_name || (scoped && !full) mode = options.delete(:mode) || :inline raise ArgumentError, "unsupported keywords: #{options.keys.map(&:inspect).join(", ")}" if options.any? # import only import_scope(relation, method_name: method_name, mode: mode) self.refresh if refresh true else full_reindex(relation, **options) end end def create_index(index_options: nil) index_options ||= self.index_options index = Searchkick::Index.new("#{name}_#{Time.now.strftime('%Y%m%d%H%M%S%L')}", @options) index.create(index_options) index end def import_scope(relation, **options) relation_indexer.reindex(relation, **options) end def batches_left relation_indexer.batches_left end # private def klass_document_type(klass, ignore_type = false) @klass_document_type[[klass, ignore_type]] ||= begin if !ignore_type && klass.searchkick_klass.searchkick_options[:_type] type = klass.searchkick_klass.searchkick_options[:_type] type = type.call if type.respond_to?(:call) type else klass.model_name.to_s.underscore end end end # private def conversions_fields @conversions_fields ||= begin conversions = Array(options[:conversions]) conversions.map(&:to_s) + conversions.map(&:to_sym) end end # private def suggest_fields @suggest_fields ||= Array(options[:suggest]).map(&:to_s) end # private def locations_fields @locations_fields ||= begin locations = Array(options[:locations]) locations.map(&:to_s) + locations.map(&:to_sym) end end # private def uuid index_settings["uuid"] end protected def client Searchkick.client end def queue_index(records) Searchkick.indexer.queue(records.map { |r| RecordData.new(self, r).index_data }) end def queue_delete(records) Searchkick.indexer.queue(records.reject { |r| r.id.blank? }.map { |r| RecordData.new(self, r).delete_data }) end def queue_update(records, method_name) Searchkick.indexer.queue(records.map { |r| RecordData.new(self, r).update_data(method_name) }) end def relation_indexer @relation_indexer ||= RelationIndexer.new(self) end def index_settings settings.values.first["settings"]["index"] end def import_before_promotion(index, relation, **import_options) index.import_scope(relation, **import_options) end def reindex_records(object, mode: nil, refresh: false, **options) mode ||= Searchkick.callbacks_value || @options[:callbacks] || true mode = :inline if mode == :bulk result = RecordIndexer.new(self).reindex(object, mode: mode, full: false, **options) self.refresh if refresh result end # https://gist.github.com/jarosan/3124884 # http://www.elasticsearch.org/blog/changing-mapping-with-zero-downtime/ # TODO deprecate async in favor of mode: :async, wait: true/false def full_reindex(relation, import: true, resume: false, retain: false, async: false, refresh_interval: nil, scope: nil) if resume index_name = all_indices.sort.last raise Searchkick::Error, "No index to resume" unless index_name index = Searchkick::Index.new(index_name, @options) else clean_indices unless retain index_options = relation.searchkick_index_options index_options.deep_merge!(settings: {index: {refresh_interval: refresh_interval}}) if refresh_interval index = create_index(index_options: index_options) end import_options = { mode: (async ? :async : :inline), full: true, resume: resume, scope: scope } uuid = index.uuid # check if alias exists alias_exists = alias_exists? if alias_exists import_before_promotion(index, relation, **import_options) if import # get existing indices to remove unless async check_uuid(uuid, index.uuid) promote(index.name, update_refresh_interval: !refresh_interval.nil?) clean_indices unless retain end else delete if exists? promote(index.name, update_refresh_interval: !refresh_interval.nil?) # import after promotion index.import_scope(relation, **import_options) if import end if async if async.is_a?(Hash) && async[:wait] puts "Created index: #{index.name}" puts "Jobs queued. Waiting..." loop do sleep 3 status = Searchkick.reindex_status(index.name) break if status[:completed] puts "Batches left: #{status[:batches_left]}" end # already promoted if alias didn't exist if alias_exists puts "Jobs complete. Promoting..." check_uuid(uuid, index.uuid) promote(index.name, update_refresh_interval: !refresh_interval.nil?) end clean_indices unless retain puts "SUCCESS!" end {index_name: index.name} else index.refresh true end rescue => e if Searchkick.transport_error?(e) && e.message.include?("No handler for type [text]") raise UnsupportedVersionError end raise e end # safety check # still a chance for race condition since its called before promotion # ideal is for user to disable automatic index creation # https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#index-creation def check_uuid(old_uuid, new_uuid) if old_uuid != new_uuid raise Searchkick::Error, "Safety check failed - only run one Model.reindex per model at a time" end end def notify(record, name) if Searchkick.callbacks_value == :bulk yield else name = "#{record.class.searchkick_klass.name} #{name}" if record && record.class.searchkick_klass event = { name: name, id: search_id(record) } ActiveSupport::Notifications.instrument("request.searchkick", event) do yield end end end def notify_bulk(records, name) if Searchkick.callbacks_value == :bulk yield else event = { name: "#{records.first.class.searchkick_klass.name} #{name}", count: records.size } ActiveSupport::Notifications.instrument("request.searchkick", event) do yield end end end end end