lib/searchkick/index.rb in searchkick-2.0.4 vs lib/searchkick/index.rb in searchkick-2.1.0
- old
+ new
@@ -246,41 +246,18 @@
true
end
end
def import_scope(scope, resume: false, method_name: nil, async: false, batch: false, batch_id: nil, full: false)
- batch_size = @options[:batch_size] || 1000
-
# use scope for import
scope = scope.search_import if scope.respond_to?(:search_import)
if batch
import_or_update scope.to_a, method_name, async
- Searchkick.redis.srem(batches_key, batch_id) if batch_id && Searchkick.redis
+ redis.srem(batches_key, batch_id) if batch_id && redis
elsif full && async
- if scope.respond_to?(:primary_key)
- # TODO expire Redis key
- primary_key = scope.primary_key
- starting_id = scope.minimum(primary_key) || 0
- max_id = scope.maximum(primary_key) || 0
- batches_count = ((max_id - starting_id + 1) / batch_size.to_f).ceil
-
- batches_count.times do |i|
- batch_id = i + 1
- min_id = starting_id + (i * batch_size)
- Searchkick::BulkReindexJob.perform_later(
- class_name: scope.model_name.name,
- min_id: min_id,
- max_id: min_id + batch_size - 1,
- index_name: name,
- batch_id: batch_id
- )
- Searchkick.redis.sadd(batches_key, batch_id) if Searchkick.redis
- end
- else
- raise Searchkick::Error, "async option only supported for ActiveRecord"
- end
+ full_reindex_async(scope)
elsif scope.respond_to?(:find_in_batches)
if resume
# use total docs instead of max id since there's not a great way
# to get the max _id without scripting since it's a string
@@ -292,27 +269,18 @@
scope.find_in_batches batch_size: batch_size do |batch|
import_or_update batch, method_name, async
end
else
- # https://github.com/karmi/tire/blob/master/lib/tire/model/import.rb
- # use cursor for Mongoid
- items = []
- # TODO add resume
- scope.all.each do |item|
- items << item
- if items.length == batch_size
- import_or_update items, method_name, async
- items = []
- end
+ each_batch(scope) do |items|
+ import_or_update items, method_name, async
end
- import_or_update items, method_name, async
end
end
def batches_left
- Searchkick.redis.scard(batches_key) if Searchkick.redis
+ redis.scard(batches_key) if redis
end
# other
def tokens(text, options = {})
@@ -430,22 +398,86 @@
record_ids: records.map(&:id),
index_name: name,
method_name: method_name ? method_name.to_s : nil
)
else
- retries = 0
records = records.select(&:should_index?)
- begin
- method_name ? bulk_update(records, method_name) : import(records)
- rescue Faraday::ClientError => e
- if retries < 1
- retries += 1
- retry
+ if records.any?
+ with_retries do
+ method_name ? bulk_update(records, method_name) : import(records)
end
- raise e
end
end
end
+ end
+
+ def full_reindex_async(scope)
+ if scope.respond_to?(:primary_key)
+ # TODO expire Redis key
+ primary_key = scope.primary_key
+ starting_id = scope.minimum(primary_key) || 0
+ max_id = scope.maximum(primary_key) || 0
+ batches_count = ((max_id - starting_id + 1) / batch_size.to_f).ceil
+
+ batches_count.times do |i|
+ batch_id = i + 1
+ min_id = starting_id + (i * batch_size)
+ bulk_reindex_job scope, batch_id, min_id: min_id, max_id: min_id + batch_size - 1
+ end
+ else
+ batch_id = 1
+ # TODO remove any eager loading
+ scope = scope.only(:_id) if scope.respond_to?(:only)
+ each_batch(scope) do |items|
+ bulk_reindex_job scope, batch_id, record_ids: items.map { |i| i.id.to_s }
+ batch_id += 1
+ end
+ end
+ end
+
+ def each_batch(scope)
+ # https://github.com/karmi/tire/blob/master/lib/tire/model/import.rb
+ # use cursor for Mongoid
+ items = []
+ scope.all.each do |item|
+ items << item
+ if items.length == batch_size
+ yield items
+ items = []
+ end
+ end
+ yield items if items.any?
+ end
+
+ def bulk_reindex_job(scope, batch_id, options)
+ Searchkick::BulkReindexJob.perform_later({
+ class_name: scope.model_name.name,
+ index_name: name,
+ batch_id: batch_id
+ }.merge(options))
+ redis.sadd(batches_key, batch_id) if redis
+ end
+
+ def batch_size
+ @batch_size ||= @options[:batch_size] || 1000
+ end
+
+ def with_retries
+ retries = 0
+
+ begin
+ yield
+ rescue Faraday::ClientError => e
+ if retries < 1
+ retries += 1
+ retry
+ end
+ raise e
+ end
+ end
+
+ def redis
+ Searchkick.redis
end
def batches_key
"searchkick:reindex:#{name}:batches"
end