lib/searchkick/index.rb in searchkick-2.0.4 vs lib/searchkick/index.rb in searchkick-2.1.0

- old
+ new

@@ -246,41 +246,18 @@ true end end def import_scope(scope, resume: false, method_name: nil, async: false, batch: false, batch_id: nil, full: false) - batch_size = @options[:batch_size] || 1000 - # use scope for import scope = scope.search_import if scope.respond_to?(:search_import) if batch import_or_update scope.to_a, method_name, async - Searchkick.redis.srem(batches_key, batch_id) if batch_id && Searchkick.redis + redis.srem(batches_key, batch_id) if batch_id && redis elsif full && async - if scope.respond_to?(:primary_key) - # TODO expire Redis key - primary_key = scope.primary_key - starting_id = scope.minimum(primary_key) || 0 - max_id = scope.maximum(primary_key) || 0 - batches_count = ((max_id - starting_id + 1) / batch_size.to_f).ceil - - batches_count.times do |i| - batch_id = i + 1 - min_id = starting_id + (i * batch_size) - Searchkick::BulkReindexJob.perform_later( - class_name: scope.model_name.name, - min_id: min_id, - max_id: min_id + batch_size - 1, - index_name: name, - batch_id: batch_id - ) - Searchkick.redis.sadd(batches_key, batch_id) if Searchkick.redis - end - else - raise Searchkick::Error, "async option only supported for ActiveRecord" - end + full_reindex_async(scope) elsif scope.respond_to?(:find_in_batches) if resume # use total docs instead of max id since there's not a great way # to get the max _id without scripting since it's a string @@ -292,27 +269,18 @@ scope.find_in_batches batch_size: batch_size do |batch| import_or_update batch, method_name, async end else - # https://github.com/karmi/tire/blob/master/lib/tire/model/import.rb - # use cursor for Mongoid - items = [] - # TODO add resume - scope.all.each do |item| - items << item - if items.length == batch_size - import_or_update items, method_name, async - items = [] - end + each_batch(scope) do |items| + import_or_update items, method_name, async end - import_or_update items, method_name, async end end def batches_left - Searchkick.redis.scard(batches_key) if Searchkick.redis + redis.scard(batches_key) if redis end # other def tokens(text, options = {}) @@ -430,22 +398,86 @@ record_ids: records.map(&:id), index_name: name, method_name: method_name ? method_name.to_s : nil ) else - retries = 0 records = records.select(&:should_index?) - begin - method_name ? bulk_update(records, method_name) : import(records) - rescue Faraday::ClientError => e - if retries < 1 - retries += 1 - retry + if records.any? + with_retries do + method_name ? bulk_update(records, method_name) : import(records) end - raise e end end end + end + + def full_reindex_async(scope) + if scope.respond_to?(:primary_key) + # TODO expire Redis key + primary_key = scope.primary_key + starting_id = scope.minimum(primary_key) || 0 + max_id = scope.maximum(primary_key) || 0 + batches_count = ((max_id - starting_id + 1) / batch_size.to_f).ceil + + batches_count.times do |i| + batch_id = i + 1 + min_id = starting_id + (i * batch_size) + bulk_reindex_job scope, batch_id, min_id: min_id, max_id: min_id + batch_size - 1 + end + else + batch_id = 1 + # TODO remove any eager loading + scope = scope.only(:_id) if scope.respond_to?(:only) + each_batch(scope) do |items| + bulk_reindex_job scope, batch_id, record_ids: items.map { |i| i.id.to_s } + batch_id += 1 + end + end + end + + def each_batch(scope) + # https://github.com/karmi/tire/blob/master/lib/tire/model/import.rb + # use cursor for Mongoid + items = [] + scope.all.each do |item| + items << item + if items.length == batch_size + yield items + items = [] + end + end + yield items if items.any? + end + + def bulk_reindex_job(scope, batch_id, options) + Searchkick::BulkReindexJob.perform_later({ + class_name: scope.model_name.name, + index_name: name, + batch_id: batch_id + }.merge(options)) + redis.sadd(batches_key, batch_id) if redis + end + + def batch_size + @batch_size ||= @options[:batch_size] || 1000 + end + + def with_retries + retries = 0 + + begin + yield + rescue Faraday::ClientError => e + if retries < 1 + retries += 1 + retry + end + raise e + end + end + + def redis + Searchkick.redis end def batches_key "searchkick:reindex:#{name}:batches" end