lib/searchkick/index.rb in searchkick-4.6.3 vs lib/searchkick/index.rb in searchkick-5.0.0
- old
+ new
@@ -1,7 +1,5 @@
-require "searchkick/index_options"
-
module Searchkick
class Index
attr_reader :name, :options
def initialize(name, options = {})
@@ -38,16 +36,19 @@
def alias_exists?
client.indices.exists_alias name: name
end
+ # call to_h for consistent results between elasticsearch gem 7 and 8
+ # could do for all API calls, but just do for ones where return value is focus for now
def mapping
- client.indices.get_mapping index: name
+ client.indices.get_mapping(index: name).to_h
end
+ # call to_h for consistent results between elasticsearch gem 7 and 8
def settings
- client.indices.get_settings index: name
+ client.indices.get_settings(index: name).to_h
end
def refresh_interval
index_settings["refresh_interval"]
end
@@ -95,11 +96,11 @@
def retrieve(record)
record_data = RecordData.new(self, record).record_data
# remove underscore
- get_options = Hash[record_data.map { |k, v| [k.to_s.sub(/\A_/, "").to_sym, v] }]
+ get_options = record_data.to_h { |k, v| [k.to_s.sub(/\A_/, "").to_sym, v] }
client.get(get_options)["_source"]
end
def all_indices(unaliased: false)
@@ -125,71 +126,79 @@
Searchkick::Index.new(index).delete
end
indices
end
- # record based
- # use helpers for notifications
-
def store(record)
- bulk_indexer.bulk_index([record])
+ notify(record, "Store") do
+ queue_index([record])
+ end
end
def remove(record)
- bulk_indexer.bulk_delete([record])
+ notify(record, "Remove") do
+ queue_delete([record])
+ end
end
def update_record(record, method_name)
- bulk_indexer.bulk_update([record], method_name)
+ notify(record, "Update") do
+ queue_update([record], method_name)
+ end
end
def bulk_delete(records)
- bulk_indexer.bulk_delete(records)
+ return if records.empty?
+
+ notify_bulk(records, "Delete") do
+ queue_delete(records)
+ end
end
def bulk_index(records)
- bulk_indexer.bulk_index(records)
+ return if records.empty?
+
+ notify_bulk(records, "Import") do
+ queue_index(records)
+ end
end
alias_method :import, :bulk_index
def bulk_update(records, method_name)
- bulk_indexer.bulk_update(records, method_name)
+ return if records.empty?
+
+ notify_bulk(records, "Update") do
+ queue_update(records, method_name)
+ end
end
def search_id(record)
RecordData.new(self, record).search_id
end
def document_type(record)
RecordData.new(self, record).document_type
end
- # TODO use like: [{_index: ..., _id: ...}] in Searchkick 5
def similar_record(record, **options)
- like_text = retrieve(record).to_hash
- .keep_if { |k, _| !options[:fields] || options[:fields].map(&:to_s).include?(k) }
- .values.compact.join(" ")
-
- options[:where] ||= {}
- options[:where][:_id] ||= {}
- options[:where][:_id][:not] = Array(options[:where][:_id][:not]) + [record.id.to_s]
options[:per_page] ||= 10
- options[:similar] = true
+ options[:similar] = [RecordData.new(self, record).record_data]
+ options[:models] ||= [record.class] unless options.key?(:model)
- # TODO use index class instead of record class
- Searchkick.search(like_text, model: record.class, **options)
+ Searchkick.search("*", **options)
end
def reload_synonyms
if Searchkick.opensearch?
client.transport.perform_request "POST", "_plugins/_refresh_search_analyzers/#{CGI.escape(name)}"
else
raise Error, "Requires Elasticsearch 7.3+" if Searchkick.server_below?("7.3.0")
begin
client.transport.perform_request("GET", "#{CGI.escape(name)}/_reload_search_analyzers")
- rescue Elasticsearch::Transport::Transport::Errors::MethodNotAllowed
- raise Error, "Requires non-OSS version of Elasticsearch"
+ rescue => e
+ raise Error, "Requires non-OSS version of Elasticsearch" if Searchkick.not_allowed_error?(e)
+ raise e
end
end
end
# queue
@@ -198,33 +207,39 @@
Searchkick::ReindexQueue.new(name)
end
# reindex
- def reindex(relation, method_name, scoped:, full: false, scope: nil, **options)
+ # note: this is designed to be used internally
+ # so it does not check object matches index class
+ def reindex(object, method_name: nil, full: false, **options)
+ if object.is_a?(Array)
+ # note: purposefully skip full
+ return reindex_records(object, method_name: method_name, **options)
+ end
+
+ if !object.respond_to?(:searchkick_klass)
+ raise Error, "Cannot reindex object"
+ end
+
+ scoped = Searchkick.relation?(object)
+ # call searchkick_klass for inheritance
+ relation = scoped ? object.all : Searchkick.scope(object.searchkick_klass).all
+
refresh = options.fetch(:refresh, !scoped)
options.delete(:refresh)
- if method_name
- # TODO throw ArgumentError
- Searchkick.warn("unsupported keywords: #{options.keys.map(&:inspect).join(", ")}") if options.any?
+ if method_name || (scoped && !full)
+ mode = options.delete(:mode) || :inline
+ raise ArgumentError, "unsupported keywords: #{options.keys.map(&:inspect).join(", ")}" if options.any?
- # update
- import_scope(relation, method_name: method_name, scope: scope)
+ # import only
+ import_scope(relation, method_name: method_name, mode: mode)
self.refresh if refresh
true
- elsif scoped && !full
- # TODO throw ArgumentError
- Searchkick.warn("unsupported keywords: #{options.keys.map(&:inspect).join(", ")}") if options.any?
-
- # reindex association
- import_scope(relation, scope: scope)
- self.refresh if refresh
- true
else
- # full reindex
- reindex_scope(relation, scope: scope, **options)
+ full_reindex(relation, **options)
end
end
def create_index(index_options: nil)
index_options ||= self.index_options
@@ -232,19 +247,18 @@
index.create(index_options)
index
end
def import_scope(relation, **options)
- bulk_indexer.import_scope(relation, **options)
+ relation_indexer.reindex(relation, **options)
end
def batches_left
- bulk_indexer.batches_left
+ relation_indexer.batches_left
end
- # other
-
+ # private
def klass_document_type(klass, ignore_type = false)
@klass_document_type[[klass, ignore_type]] ||= begin
if !ignore_type && klass.searchkick_klass.searchkick_options[:_type]
type = klass.searchkick_klass.searchkick_options[:_type]
type = type.call if type.respond_to?(:call)
@@ -253,22 +267,24 @@
klass.model_name.to_s.underscore
end
end
end
- # should not be public
+ # private
def conversions_fields
@conversions_fields ||= begin
conversions = Array(options[:conversions])
conversions.map(&:to_s) + conversions.map(&:to_sym)
end
end
+ # private
def suggest_fields
@suggest_fields ||= Array(options[:suggest]).map(&:to_s)
end
+ # private
def locations_fields
@locations_fields ||= begin
locations = Array(options[:locations])
locations.map(&:to_s) + locations.map(&:to_sym)
end
@@ -283,25 +299,47 @@
def client
Searchkick.client
end
- def bulk_indexer
- @bulk_indexer ||= BulkIndexer.new(self)
+ def queue_index(records)
+ Searchkick.indexer.queue(records.map { |r| RecordData.new(self, r).index_data })
end
+ def queue_delete(records)
+ Searchkick.indexer.queue(records.reject { |r| r.id.blank? }.map { |r| RecordData.new(self, r).delete_data })
+ end
+
+ def queue_update(records, method_name)
+ Searchkick.indexer.queue(records.map { |r| RecordData.new(self, r).update_data(method_name) })
+ end
+
+ def relation_indexer
+ @relation_indexer ||= RelationIndexer.new(self)
+ end
+
def index_settings
settings.values.first["settings"]["index"]
end
def import_before_promotion(index, relation, **import_options)
index.import_scope(relation, **import_options)
end
+ def reindex_records(object, mode: nil, refresh: false, **options)
+ mode ||= Searchkick.callbacks_value || @options[:callbacks] || true
+ mode = :inline if mode == :bulk
+
+ result = RecordIndexer.new(self).reindex(object, mode: mode, full: false, **options)
+ self.refresh if refresh
+ result
+ end
+
# https://gist.github.com/jarosan/3124884
# http://www.elasticsearch.org/blog/changing-mapping-with-zero-downtime/
- def reindex_scope(relation, import: true, resume: false, retain: false, async: false, refresh_interval: nil, scope: nil)
+ # TODO deprecate async in favor of mode: :async, wait: true/false
+ def full_reindex(relation, import: true, resume: false, retain: false, async: false, refresh_interval: nil, scope: nil)
if resume
index_name = all_indices.sort.last
raise Searchkick::Error, "No index to resume" unless index_name
index = Searchkick::Index.new(index_name, @options)
else
@@ -311,13 +349,13 @@
index_options.deep_merge!(settings: {index: {refresh_interval: refresh_interval}}) if refresh_interval
index = create_index(index_options: index_options)
end
import_options = {
- resume: resume,
- async: async,
+ mode: (async ? :async : :inline),
full: true,
+ resume: resume,
scope: scope
}
uuid = index.uuid
@@ -365,11 +403,11 @@
index.refresh
true
end
rescue => e
if Searchkick.transport_error?(e) && e.message.include?("No handler for type [text]")
- raise UnsupportedVersionError, "This version of Searchkick requires Elasticsearch 6 or greater"
+ raise UnsupportedVersionError
end
raise e
end
@@ -378,9 +416,38 @@
# ideal is for user to disable automatic index creation
# https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#index-creation
def check_uuid(old_uuid, new_uuid)
if old_uuid != new_uuid
raise Searchkick::Error, "Safety check failed - only run one Model.reindex per model at a time"
+ end
+ end
+
+ def notify(record, name)
+ if Searchkick.callbacks_value == :bulk
+ yield
+ else
+ name = "#{record.class.searchkick_klass.name} #{name}" if record && record.class.searchkick_klass
+ event = {
+ name: name,
+ id: search_id(record)
+ }
+ ActiveSupport::Notifications.instrument("request.searchkick", event) do
+ yield
+ end
+ end
+ end
+
+ def notify_bulk(records, name)
+ if Searchkick.callbacks_value == :bulk
+ yield
+ else
+ event = {
+ name: "#{records.first.class.searchkick_klass.name} #{name}",
+ count: records.size
+ }
+ ActiveSupport::Notifications.instrument("request.searchkick", event) do
+ yield
+ end
end
end
end
end