lib/searchkick/index.rb in searchkick-2.5.0 vs lib/searchkick/index.rb in searchkick-3.0.0

- old
+ new

@@ -1,5 +1,7 @@ +require "searchkick/index_options" + module Searchkick class Index include IndexOptions attr_reader :name, :options @@ -50,13 +52,30 @@ def update_settings(settings) client.indices.put_settings index: name, body: settings end + def tokens(text, options = {}) + client.indices.analyze(body: {text: text}.merge(options), index: name)["tokens"].map { |t| t["token"] } + end + + def total_docs + response = + client.search( + index: name, + body: { + query: {match_all: {}}, + size: 0 + } + ) + + response["hits"]["total"] + end + def promote(new_name, update_refresh_interval: false) if update_refresh_interval - new_index = Searchkick::Index.new(new_name) + new_index = Searchkick::Index.new(new_name, @options) settings = options[:settings] || {} refresh_interval = (settings[:index] && settings[:index][:refresh_interval]) || "1s" new_index.update_settings(index: {refresh_interval: refresh_interval}) end @@ -69,80 +88,74 @@ actions = old_indices.map { |old_name| {remove: {index: old_name, alias: name}} } + [{add: {index: new_name, alias: name}}] client.indices.update_aliases body: {actions: actions} end alias_method :swap, :promote + def retrieve(record) + client.get( + index: name, + type: document_type(record), + id: search_id(record) + )["_source"] + end + + def all_indices(unaliased: false) + indices = + begin + client.indices.get_aliases + rescue Elasticsearch::Transport::Transport::Errors::NotFound + {} + end + indices = indices.select { |_k, v| v.empty? || v["aliases"].empty? } if unaliased + indices.select { |k, _v| k =~ /\A#{Regexp.escape(name)}_\d{14,17}\z/ }.keys + end + + # remove old indices that start w/ index_name + def clean_indices + indices = all_indices(unaliased: true) + indices.each do |index| + Searchkick::Index.new(index).delete + end + indices + end + # record based # use helpers for notifications def store(record) - bulk_index_helper([record]) + bulk_indexer.bulk_index([record]) end def remove(record) - bulk_delete_helper([record]) + bulk_indexer.bulk_delete([record]) end def update_record(record, method_name) - bulk_update_helper([record], method_name) + bulk_indexer.bulk_update([record], method_name) end def bulk_delete(records) - bulk_delete_helper(records) + bulk_indexer.bulk_delete(records) end def bulk_index(records) - bulk_index_helper(records) + bulk_indexer.bulk_index(records) end alias_method :import, :bulk_index def bulk_update(records, method_name) - bulk_update_helper(records, method_name) + bulk_indexer.bulk_update(records, method_name) end - def record_data(r) - data = { - _index: name, - _id: search_id(r), - _type: document_type(r) - } - data[:_routing] = r.search_routing if r.respond_to?(:search_routing) - data + def search_id(record) + RecordData.new(self, record).search_id end - def retrieve(record) - client.get( - index: name, - type: document_type(record), - id: search_id(record) - )["_source"] + def document_type(record) + RecordData.new(self, record).document_type end - def reindex_record(record) - if record.destroyed? || !record.should_index? - begin - remove(record) - rescue Elasticsearch::Transport::Transport::Errors::NotFound - # do nothing - end - else - store(record) - end - end - - def reindex_record_async(record) - if Searchkick.callbacks_value.nil? - if defined?(Searchkick::ReindexV2Job) - Searchkick::ReindexV2Job.perform_later(record.class.name, record.id.to_s) - else - raise Searchkick::Error, "Active Job not found" - end - else - reindex_record(record) - end - end - def similar_record(record, **options) like_text = retrieve(record).to_hash .keep_if { |k, _| !options[:fields] || options[:fields].map(&:to_s).include?(k) } .values.compact.join(" ") @@ -152,81 +165,105 @@ options[:where][:_id][:not] = record.id.to_s options[:per_page] ||= 10 options[:similar] = true # TODO use index class instead of record class - search_model(record.class, like_text, options) + Searchkick.search(like_text, model: record.class, **options) end # queue def reindex_queue Searchkick::ReindexQueue.new(name) end - # search + # reindex - # TODO remove in next major version - def search_model(searchkick_klass, term = "*", **options, &block) - query = Searchkick::Query.new(searchkick_klass, term, options) - yield(query.body) if block - if options[:execute] == false - query + def reindex(scope, method_name, scoped:, full: false, **options) + refresh = options.fetch(:refresh, !scoped) + + if method_name + # update + import_scope(scope, method_name: method_name) + self.refresh if refresh + true + elsif scoped && !full + # reindex association + import_scope(scope) + self.refresh if refresh + true else - query.execute + # full reindex + reindex_scope(scope, options) end end - # reindex - def create_index(index_options: nil) index_options ||= self.index_options index = Searchkick::Index.new("#{name}_#{Time.now.strftime('%Y%m%d%H%M%S%L')}", @options) index.create(index_options) index end - def all_indices(unaliased: false) - indices = - begin - client.indices.get_aliases - rescue Elasticsearch::Transport::Transport::Errors::NotFound - {} + def import_scope(scope, **options) + bulk_indexer.import_scope(scope, **options) + end + + def batches_left + bulk_indexer.batches_left + end + + # other + + def klass_document_type(klass, ignore_type = false) + @klass_document_type[[klass, ignore_type]] ||= begin + if !ignore_type && klass.searchkick_klass.searchkick_options[:_type] + type = klass.searchkick_klass.searchkick_options[:_type] + type = type.call if type.respond_to?(:call) + type + else + klass.model_name.to_s.underscore end - indices = indices.select { |_k, v| v.empty? || v["aliases"].empty? } if unaliased - indices.select { |k, _v| k =~ /\A#{Regexp.escape(name)}_\d{14,17}\z/ }.keys + end end - # remove old indices that start w/ index_name - def clean_indices - indices = all_indices(unaliased: true) - indices.each do |index| - Searchkick::Index.new(index).delete + # should not be public + def conversions_fields + @conversions_fields ||= begin + conversions = Array(options[:conversions]) + conversions.map(&:to_s) + conversions.map(&:to_sym) end - indices end - def total_docs - response = - client.search( - index: name, - body: { - query: {match_all: {}}, - size: 0 - } - ) + def suggest_fields + @suggest_fields ||= Array(options[:suggest]).map(&:to_s) + end - response["hits"]["total"] + def locations_fields + @locations_fields ||= begin + locations = Array(options[:locations]) + locations.map(&:to_s) + locations.map(&:to_sym) + end end + protected + + def client + Searchkick.client + end + + def bulk_indexer + @bulk_indexer ||= BulkIndexer.new(self) + end + # https://gist.github.com/jarosan/3124884 # http://www.elasticsearch.org/blog/changing-mapping-with-zero-downtime/ def reindex_scope(scope, import: true, resume: false, retain: false, async: false, refresh_interval: nil) if resume index_name = all_indices.sort.last raise Searchkick::Error, "No index to resume" unless index_name - index = Searchkick::Index.new(index_name) + index = Searchkick::Index.new(index_name, @options) else clean_indices unless retain index_options = scope.searchkick_index_options index_options.deep_merge!(settings: {index: {refresh_interval: refresh_interval}}) if refresh_interval @@ -274,277 +311,14 @@ {index_name: index.name} else index.refresh true end - end - - def import_scope(scope, resume: false, method_name: nil, async: false, batch: false, batch_id: nil, full: false) - # use scope for import - scope = scope.search_import if scope.respond_to?(:search_import) - - if batch - import_or_update scope.to_a, method_name, async - Searchkick.with_redis { |r| r.srem(batches_key, batch_id) } if batch_id - elsif full && async - full_reindex_async(scope) - elsif scope.respond_to?(:find_in_batches) - if resume - # use total docs instead of max id since there's not a great way - # to get the max _id without scripting since it's a string - - # TODO use primary key and prefix with table name - scope = scope.where("id > ?", total_docs) - end - - scope = scope.select("id").except(:includes, :preload) if async - - scope.find_in_batches batch_size: batch_size do |items| - import_or_update items, method_name, async - end - else - each_batch(scope) do |items| - import_or_update items, method_name, async - end + rescue Elasticsearch::Transport::Transport::Errors::BadRequest => e + if e.message.include?("No handler for type [text]") + raise UnsupportedVersionError, "This version of Searchkick requires Elasticsearch 5 or greater" end - end - def batches_left - Searchkick.with_redis { |r| r.scard(batches_key) } - end - - # other - - def tokens(text, options = {}) - client.indices.analyze(body: {text: text}.merge(options), index: name)["tokens"].map { |t| t["token"] } - end - - def klass_document_type(klass, ignore_type = false) - @klass_document_type[[klass, ignore_type]] ||= begin - if klass.respond_to?(:document_type) - klass.document_type - elsif !ignore_type && klass.searchkick_klass.searchkick_options[:_type] - type = klass.searchkick_klass.searchkick_options[:_type] - type = type.call if type.respond_to?(:call) - type - else - klass.model_name.to_s.underscore - end - end - end - - protected - - def client - Searchkick.client - end - - def document_type(record, ignore_type = false) - if record.respond_to?(:search_document_type) - record.search_document_type - else - klass_document_type(record.class, ignore_type) - end - end - - def search_id(record) - id = record.respond_to?(:search_document_id) ? record.search_document_id : record.id - id.is_a?(Numeric) ? id : id.to_s - end - - EXCLUDED_ATTRIBUTES = ["_id", "_type"] - - def search_data(record, method_name = nil) - partial_reindex = !method_name.nil? - options = record.class.searchkick_options - - # remove _id since search_id is used instead - source = record.send(method_name || :search_data).each_with_object({}) { |(k, v), memo| memo[k.to_s] = v; memo }.except(*EXCLUDED_ATTRIBUTES) - - # conversions - if options[:conversions] - Array(options[:conversions]).map(&:to_s).each do |conversions_field| - if source[conversions_field] - source[conversions_field] = source[conversions_field].map { |k, v| {query: k, count: v} } - end - end - end - - # hack to prevent generator field doesn't exist error - if options[:suggest] - options[:suggest].map(&:to_s).each do |field| - source[field] = nil if !source[field] && !partial_reindex - end - end - - # locations - if options[:locations] - options[:locations].map(&:to_s).each do |field| - if source[field] - if !source[field].is_a?(Hash) && (source[field].first.is_a?(Array) || source[field].first.is_a?(Hash)) - # multiple locations - source[field] = source[field].map { |a| location_value(a) } - else - source[field] = location_value(source[field]) - end - end - end - end - - if !source.key?("type") && record.class.searchkick_klass.searchkick_options[:inheritance] - source["type"] = document_type(record, true) - end - - cast_big_decimal(source) - - source - end - - def location_value(value) - if value.is_a?(Array) - value.map(&:to_f).reverse - elsif value.is_a?(Hash) - {lat: value[:lat].to_f, lon: value[:lon].to_f} - else - value - end - end - - # change all BigDecimal values to floats due to - # https://github.com/rails/rails/issues/6033 - # possible loss of precision :/ - def cast_big_decimal(obj) - case obj - when BigDecimal - obj.to_f - when Hash - obj.each do |k, v| - obj[k] = cast_big_decimal(v) - end - when Enumerable - obj.map do |v| - cast_big_decimal(v) - end - else - obj - end - end - - def import_or_update(records, method_name, async) - if records.any? - if async - Searchkick::BulkReindexJob.perform_later( - class_name: records.first.class.name, - record_ids: records.map(&:id), - index_name: name, - method_name: method_name ? method_name.to_s : nil - ) - else - records = records.select(&:should_index?) - if records.any? - with_retries do - method_name ? bulk_update(records, method_name) : import(records) - end - end - end - end - end - - def full_reindex_async(scope) - if scope.respond_to?(:primary_key) - # TODO expire Redis key - primary_key = scope.primary_key - - starting_id = - begin - scope.minimum(primary_key) - rescue ActiveRecord::StatementInvalid - false - end - - if starting_id.nil? - # no records, do nothing - elsif starting_id.is_a?(Numeric) - max_id = scope.maximum(primary_key) - batches_count = ((max_id - starting_id + 1) / batch_size.to_f).ceil - - batches_count.times do |i| - batch_id = i + 1 - min_id = starting_id + (i * batch_size) - bulk_reindex_job scope, batch_id, min_id: min_id, max_id: min_id + batch_size - 1 - end - else - scope.find_in_batches(batch_size: batch_size).each_with_index do |batch, i| - batch_id = i + 1 - - bulk_reindex_job scope, batch_id, record_ids: batch.map { |record| record.id.to_s } - end - end - else - batch_id = 1 - # TODO remove any eager loading - scope = scope.only(:_id) if scope.respond_to?(:only) - each_batch(scope) do |items| - bulk_reindex_job scope, batch_id, record_ids: items.map { |i| i.id.to_s } - batch_id += 1 - end - end - end - - def each_batch(scope) - # https://github.com/karmi/tire/blob/master/lib/tire/model/import.rb - # use cursor for Mongoid - items = [] - scope.all.each do |item| - items << item - if items.length == batch_size - yield items - items = [] - end - end - yield items if items.any? - end - - def bulk_reindex_job(scope, batch_id, options) - Searchkick::BulkReindexJob.perform_later({ - class_name: scope.model_name.name, - index_name: name, - batch_id: batch_id - }.merge(options)) - Searchkick.with_redis { |r| r.sadd(batches_key, batch_id) } - end - - def batch_size - @batch_size ||= @options[:batch_size] || 1000 - end - - def with_retries - retries = 0 - - begin - yield - rescue Faraday::ClientError => e - if retries < 1 - retries += 1 - retry - end - raise e - end - end - - def bulk_index_helper(records) - Searchkick.indexer.queue(records.map { |r| {index: record_data(r).merge(data: search_data(r))} }) - end - - def bulk_delete_helper(records) - Searchkick.indexer.queue(records.reject { |r| r.id.blank? }.map { |r| {delete: record_data(r)} }) - end - - def bulk_update_helper(records, method_name) - Searchkick.indexer.queue(records.map { |r| {update: record_data(r).merge(data: {doc: search_data(r, method_name)})} }) - end - - def batches_key - "searchkick:reindex:#{name}:batches" + raise e end end end